001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs.s3;
020    
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.net.URI;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.concurrent.TimeUnit;
029    
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FSDataInputStream;
034    import org.apache.hadoop.fs.FSDataOutputStream;
035    import org.apache.hadoop.fs.FileStatus;
036    import org.apache.hadoop.fs.FileSystem;
037    import org.apache.hadoop.fs.Path;
038    import org.apache.hadoop.fs.permission.FsPermission;
039    import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
040    import org.apache.hadoop.io.retry.RetryPolicies;
041    import org.apache.hadoop.io.retry.RetryPolicy;
042    import org.apache.hadoop.io.retry.RetryProxy;
043    import org.apache.hadoop.util.Progressable;
044    
045    /**
046     * <p>
047     * A block-based {@link FileSystem} backed by
048     * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049     * </p>
050     * @see NativeS3FileSystem
051     */
052    @InterfaceAudience.Public
053    @InterfaceStability.Stable
054    public class S3FileSystem extends FileSystem {
055    
056      private URI uri;
057    
058      private FileSystemStore store;
059    
060      private Path workingDir;
061    
062      public S3FileSystem() {
063        // set store in initialize()
064      }
065      
066      public S3FileSystem(FileSystemStore store) {
067        this.store = store;
068      }
069    
070      /**
071       * Return the protocol scheme for the FileSystem.
072       * <p/>
073       *
074       * @return <code>s3</code>
075       */
076      @Override
077      public String getScheme() {
078        return "s3";
079      }
080    
081      @Override
082      public URI getUri() {
083        return uri;
084      }
085    
086      @Override
087      public void initialize(URI uri, Configuration conf) throws IOException {
088        super.initialize(uri, conf);
089        if (store == null) {
090          store = createDefaultStore(conf);
091        }
092        store.initialize(uri, conf);
093        setConf(conf);
094        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
095        this.workingDir =
096          new Path("/user", System.getProperty("user.name")).makeQualified(this);
097      }  
098    
099      private static FileSystemStore createDefaultStore(Configuration conf) {
100        FileSystemStore store = new Jets3tFileSystemStore();
101        
102        RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
103                                                                                   conf.getInt("fs.s3.maxRetries", 4),
104                                                                                   conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
105        Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
106          new HashMap<Class<? extends Exception>, RetryPolicy>();
107        exceptionToPolicyMap.put(IOException.class, basePolicy);
108        exceptionToPolicyMap.put(S3Exception.class, basePolicy);
109        
110        RetryPolicy methodPolicy = RetryPolicies.retryByException(
111                                                                  RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
112        Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
113        methodNameToPolicyMap.put("storeBlock", methodPolicy);
114        methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
115        
116        return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
117                                                   store, methodNameToPolicyMap);
118      }
119    
120      @Override
121      public Path getWorkingDirectory() {
122        return workingDir;
123      }
124    
125      @Override
126      public void setWorkingDirectory(Path dir) {
127        workingDir = makeAbsolute(dir);
128      }
129    
130      private Path makeAbsolute(Path path) {
131        if (path.isAbsolute()) {
132          return path;
133        }
134        return new Path(workingDir, path);
135      }
136    
137      /**
138       * @param permission Currently ignored.
139       */
140      @Override
141      public boolean mkdirs(Path path, FsPermission permission) throws IOException {
142        Path absolutePath = makeAbsolute(path);
143        List<Path> paths = new ArrayList<Path>();
144        do {
145          paths.add(0, absolutePath);
146          absolutePath = absolutePath.getParent();
147        } while (absolutePath != null);
148        
149        boolean result = true;
150        for (Path p : paths) {
151          result &= mkdir(p);
152        }
153        return result;
154      }
155      
156      private boolean mkdir(Path path) throws IOException {
157        Path absolutePath = makeAbsolute(path);
158        INode inode = store.retrieveINode(absolutePath);
159        if (inode == null) {
160          store.storeINode(absolutePath, INode.DIRECTORY_INODE);
161        } else if (inode.isFile()) {
162          throw new IOException(String.format(
163              "Can't make directory for path %s since it is a file.",
164              absolutePath));
165        }
166        return true;
167      }
168    
169      @Override
170      public boolean isFile(Path path) throws IOException {
171        INode inode = store.retrieveINode(makeAbsolute(path));
172        if (inode == null) {
173          return false;
174        }
175        return inode.isFile();
176      }
177    
178      private INode checkFile(Path path) throws IOException {
179        INode inode = store.retrieveINode(makeAbsolute(path));
180        if (inode == null) {
181          throw new IOException("No such file.");
182        }
183        if (inode.isDirectory()) {
184          throw new IOException("Path " + path + " is a directory.");
185        }
186        return inode;
187      }
188    
189      @Override
190      public FileStatus[] listStatus(Path f) throws IOException {
191        Path absolutePath = makeAbsolute(f);
192        INode inode = store.retrieveINode(absolutePath);
193        if (inode == null) {
194          throw new FileNotFoundException("File " + f + " does not exist.");
195        }
196        if (inode.isFile()) {
197          return new FileStatus[] {
198            new S3FileStatus(f.makeQualified(this), inode)
199          };
200        }
201        ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
202        for (Path p : store.listSubPaths(absolutePath)) {
203          ret.add(getFileStatus(p.makeQualified(this)));
204        }
205        return ret.toArray(new FileStatus[0]);
206      }
207    
208      /** This optional operation is not yet supported. */
209      @Override
210      public FSDataOutputStream append(Path f, int bufferSize,
211          Progressable progress) throws IOException {
212        throw new IOException("Not supported");
213      }
214    
215      /**
216       * @param permission Currently ignored.
217       */
218      @Override
219      public FSDataOutputStream create(Path file, FsPermission permission,
220          boolean overwrite, int bufferSize,
221          short replication, long blockSize, Progressable progress)
222        throws IOException {
223    
224        INode inode = store.retrieveINode(makeAbsolute(file));
225        if (inode != null) {
226          if (overwrite) {
227            delete(file, true);
228          } else {
229            throw new IOException("File already exists: " + file);
230          }
231        } else {
232          Path parent = file.getParent();
233          if (parent != null) {
234            if (!mkdirs(parent)) {
235              throw new IOException("Mkdirs failed to create " + parent.toString());
236            }
237          }      
238        }
239        return new FSDataOutputStream
240            (new S3OutputStream(getConf(), store, makeAbsolute(file),
241                                blockSize, progress, bufferSize),
242             statistics);
243      }
244    
245      @Override
246      public FSDataInputStream open(Path path, int bufferSize) throws IOException {
247        INode inode = checkFile(path);
248        return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
249                                                       statistics));
250      }
251    
252      @Override
253      public boolean rename(Path src, Path dst) throws IOException {
254        Path absoluteSrc = makeAbsolute(src);
255        INode srcINode = store.retrieveINode(absoluteSrc);
256        if (srcINode == null) {
257          // src path doesn't exist
258          return false; 
259        }
260        Path absoluteDst = makeAbsolute(dst);
261        INode dstINode = store.retrieveINode(absoluteDst);
262        if (dstINode != null && dstINode.isDirectory()) {
263          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
264          dstINode = store.retrieveINode(absoluteDst);
265        }
266        if (dstINode != null) {
267          // dst path already exists - can't overwrite
268          return false;
269        }
270        Path dstParent = absoluteDst.getParent();
271        if (dstParent != null) {
272          INode dstParentINode = store.retrieveINode(dstParent);
273          if (dstParentINode == null || dstParentINode.isFile()) {
274            // dst parent doesn't exist or is a file
275            return false;
276          }
277        }
278        return renameRecursive(absoluteSrc, absoluteDst);
279      }
280      
281      private boolean renameRecursive(Path src, Path dst) throws IOException {
282        INode srcINode = store.retrieveINode(src);
283        store.storeINode(dst, srcINode);
284        store.deleteINode(src);
285        if (srcINode.isDirectory()) {
286          for (Path oldSrc : store.listDeepSubPaths(src)) {
287            INode inode = store.retrieveINode(oldSrc);
288            if (inode == null) {
289              return false;
290            }
291            String oldSrcPath = oldSrc.toUri().getPath();
292            String srcPath = src.toUri().getPath();
293            String dstPath = dst.toUri().getPath();
294            Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
295            store.storeINode(newDst, inode);
296            store.deleteINode(oldSrc);
297          }
298        }
299        return true;
300      }
301    
302      @Override
303      public boolean delete(Path path, boolean recursive) throws IOException {
304       Path absolutePath = makeAbsolute(path);
305       INode inode = store.retrieveINode(absolutePath);
306       if (inode == null) {
307         return false;
308       }
309       if (inode.isFile()) {
310         store.deleteINode(absolutePath);
311         for (Block block: inode.getBlocks()) {
312           store.deleteBlock(block);
313         }
314       } else {
315         FileStatus[] contents = null; 
316         try {
317           contents = listStatus(absolutePath);
318         } catch(FileNotFoundException fnfe) {
319           return false;
320         }
321    
322         if ((contents.length !=0) && (!recursive)) {
323           throw new IOException("Directory " + path.toString() 
324               + " is not empty.");
325         }
326         for (FileStatus p:contents) {
327           if (!delete(p.getPath(), recursive)) {
328             return false;
329           }
330         }
331         store.deleteINode(absolutePath);
332       }
333       return true;
334      }
335      
336      /**
337       * FileStatus for S3 file systems. 
338       */
339      @Override
340      public FileStatus getFileStatus(Path f)  throws IOException {
341        INode inode = store.retrieveINode(makeAbsolute(f));
342        if (inode == null) {
343          throw new FileNotFoundException(f + ": No such file or directory.");
344        }
345        return new S3FileStatus(f.makeQualified(this), inode);
346      }
347      
348      @Override
349      public long getDefaultBlockSize() {
350        return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
351      }
352    
353      @Override
354      public String getCanonicalServiceName() {
355        // Does not support Token
356        return null;
357      }
358    
359      // diagnostic methods
360    
361      void dump() throws IOException {
362        store.dump();
363      }
364    
365      void purge() throws IOException {
366        store.purge();
367      }
368    
369      private static class S3FileStatus extends FileStatus {
370    
371        S3FileStatus(Path f, INode inode) throws IOException {
372          super(findLength(inode), inode.isDirectory(), 1,
373                findBlocksize(inode), 0, f);
374        }
375    
376        private static long findLength(INode inode) {
377          if (!inode.isDirectory()) {
378            long length = 0L;
379            for (Block block : inode.getBlocks()) {
380              length += block.getLength();
381            }
382            return length;
383          }
384          return 0;
385        }
386    
387        private static long findBlocksize(INode inode) {
388          final Block[] ret = inode.getBlocks();
389          return ret == null ? 0L : ret[0].getLength();
390        }
391      }
392    }