001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs.s3;
020    
021    import java.io.FileNotFoundException;
022    import java.io.IOException;
023    import java.net.URI;
024    import java.util.ArrayList;
025    import java.util.HashMap;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.concurrent.TimeUnit;
029    
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    import org.apache.hadoop.conf.Configuration;
033    import org.apache.hadoop.fs.FSDataInputStream;
034    import org.apache.hadoop.fs.FSDataOutputStream;
035    import org.apache.hadoop.fs.FileAlreadyExistsException;
036    import org.apache.hadoop.fs.FileStatus;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.hadoop.fs.permission.FsPermission;
040    import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
041    import org.apache.hadoop.io.retry.RetryPolicies;
042    import org.apache.hadoop.io.retry.RetryPolicy;
043    import org.apache.hadoop.io.retry.RetryProxy;
044    import org.apache.hadoop.util.Progressable;
045    
046    /**
047     * <p>
048     * A block-based {@link FileSystem} backed by
049     * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
050     * </p>
051     * @see NativeS3FileSystem
052     */
053    @InterfaceAudience.Public
054    @InterfaceStability.Stable
055    public class S3FileSystem extends FileSystem {
056    
057      private URI uri;
058    
059      private FileSystemStore store;
060    
061      private Path workingDir;
062    
063      public S3FileSystem() {
064        // set store in initialize()
065      }
066      
067      public S3FileSystem(FileSystemStore store) {
068        this.store = store;
069      }
070    
071      /**
072       * Return the protocol scheme for the FileSystem.
073       * <p/>
074       *
075       * @return <code>s3</code>
076       */
077      @Override
078      public String getScheme() {
079        return "s3";
080      }
081    
082      @Override
083      public URI getUri() {
084        return uri;
085      }
086    
087      @Override
088      public void initialize(URI uri, Configuration conf) throws IOException {
089        super.initialize(uri, conf);
090        if (store == null) {
091          store = createDefaultStore(conf);
092        }
093        store.initialize(uri, conf);
094        setConf(conf);
095        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
096        this.workingDir =
097          new Path("/user", System.getProperty("user.name")).makeQualified(this);
098      }  
099    
100      private static FileSystemStore createDefaultStore(Configuration conf) {
101        FileSystemStore store = new Jets3tFileSystemStore();
102        
103        RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
104                                                                                   conf.getInt("fs.s3.maxRetries", 4),
105                                                                                   conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
106        Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
107          new HashMap<Class<? extends Exception>, RetryPolicy>();
108        exceptionToPolicyMap.put(IOException.class, basePolicy);
109        exceptionToPolicyMap.put(S3Exception.class, basePolicy);
110        
111        RetryPolicy methodPolicy = RetryPolicies.retryByException(
112                                                                  RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
113        Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
114        methodNameToPolicyMap.put("storeBlock", methodPolicy);
115        methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
116        
117        return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
118                                                   store, methodNameToPolicyMap);
119      }
120    
121      @Override
122      public Path getWorkingDirectory() {
123        return workingDir;
124      }
125    
126      @Override
127      public void setWorkingDirectory(Path dir) {
128        workingDir = makeAbsolute(dir);
129      }
130    
131      private Path makeAbsolute(Path path) {
132        if (path.isAbsolute()) {
133          return path;
134        }
135        return new Path(workingDir, path);
136      }
137    
138      /**
139       * @param permission Currently ignored.
140       */
141      @Override
142      public boolean mkdirs(Path path, FsPermission permission) throws IOException {
143        Path absolutePath = makeAbsolute(path);
144        List<Path> paths = new ArrayList<Path>();
145        do {
146          paths.add(0, absolutePath);
147          absolutePath = absolutePath.getParent();
148        } while (absolutePath != null);
149        
150        boolean result = true;
151        for (Path p : paths) {
152          result &= mkdir(p);
153        }
154        return result;
155      }
156      
157      private boolean mkdir(Path path) throws IOException {
158        Path absolutePath = makeAbsolute(path);
159        INode inode = store.retrieveINode(absolutePath);
160        if (inode == null) {
161          store.storeINode(absolutePath, INode.DIRECTORY_INODE);
162        } else if (inode.isFile()) {
163          throw new IOException(String.format(
164              "Can't make directory for path %s since it is a file.",
165              absolutePath));
166        }
167        return true;
168      }
169    
170      @Override
171      public boolean isFile(Path path) throws IOException {
172        INode inode = store.retrieveINode(makeAbsolute(path));
173        if (inode == null) {
174          return false;
175        }
176        return inode.isFile();
177      }
178    
179      private INode checkFile(Path path) throws IOException {
180        INode inode = store.retrieveINode(makeAbsolute(path));
181        if (inode == null) {
182          throw new IOException("No such file.");
183        }
184        if (inode.isDirectory()) {
185          throw new IOException("Path " + path + " is a directory.");
186        }
187        return inode;
188      }
189    
190      @Override
191      public FileStatus[] listStatus(Path f) throws IOException {
192        Path absolutePath = makeAbsolute(f);
193        INode inode = store.retrieveINode(absolutePath);
194        if (inode == null) {
195          throw new FileNotFoundException("File " + f + " does not exist.");
196        }
197        if (inode.isFile()) {
198          return new FileStatus[] {
199            new S3FileStatus(f.makeQualified(this), inode)
200          };
201        }
202        ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
203        for (Path p : store.listSubPaths(absolutePath)) {
204          ret.add(getFileStatus(p.makeQualified(this)));
205        }
206        return ret.toArray(new FileStatus[0]);
207      }
208    
209      /** This optional operation is not yet supported. */
210      @Override
211      public FSDataOutputStream append(Path f, int bufferSize,
212          Progressable progress) throws IOException {
213        throw new IOException("Not supported");
214      }
215    
216      /**
217       * @param permission Currently ignored.
218       */
219      @Override
220      public FSDataOutputStream create(Path file, FsPermission permission,
221          boolean overwrite, int bufferSize,
222          short replication, long blockSize, Progressable progress)
223        throws IOException {
224    
225        INode inode = store.retrieveINode(makeAbsolute(file));
226        if (inode != null) {
227          if (overwrite) {
228            delete(file, true);
229          } else {
230            throw new FileAlreadyExistsException("File already exists: " + file);
231          }
232        } else {
233          Path parent = file.getParent();
234          if (parent != null) {
235            if (!mkdirs(parent)) {
236              throw new IOException("Mkdirs failed to create " + parent.toString());
237            }
238          }      
239        }
240        return new FSDataOutputStream
241            (new S3OutputStream(getConf(), store, makeAbsolute(file),
242                                blockSize, progress, bufferSize),
243             statistics);
244      }
245    
246      @Override
247      public FSDataInputStream open(Path path, int bufferSize) throws IOException {
248        INode inode = checkFile(path);
249        return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
250                                                       statistics));
251      }
252    
253      @Override
254      public boolean rename(Path src, Path dst) throws IOException {
255        Path absoluteSrc = makeAbsolute(src);
256        INode srcINode = store.retrieveINode(absoluteSrc);
257        if (srcINode == null) {
258          // src path doesn't exist
259          return false; 
260        }
261        Path absoluteDst = makeAbsolute(dst);
262        INode dstINode = store.retrieveINode(absoluteDst);
263        if (dstINode != null && dstINode.isDirectory()) {
264          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
265          dstINode = store.retrieveINode(absoluteDst);
266        }
267        if (dstINode != null) {
268          // dst path already exists - can't overwrite
269          return false;
270        }
271        Path dstParent = absoluteDst.getParent();
272        if (dstParent != null) {
273          INode dstParentINode = store.retrieveINode(dstParent);
274          if (dstParentINode == null || dstParentINode.isFile()) {
275            // dst parent doesn't exist or is a file
276            return false;
277          }
278        }
279        return renameRecursive(absoluteSrc, absoluteDst);
280      }
281      
282      private boolean renameRecursive(Path src, Path dst) throws IOException {
283        INode srcINode = store.retrieveINode(src);
284        store.storeINode(dst, srcINode);
285        store.deleteINode(src);
286        if (srcINode.isDirectory()) {
287          for (Path oldSrc : store.listDeepSubPaths(src)) {
288            INode inode = store.retrieveINode(oldSrc);
289            if (inode == null) {
290              return false;
291            }
292            String oldSrcPath = oldSrc.toUri().getPath();
293            String srcPath = src.toUri().getPath();
294            String dstPath = dst.toUri().getPath();
295            Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
296            store.storeINode(newDst, inode);
297            store.deleteINode(oldSrc);
298          }
299        }
300        return true;
301      }
302    
303      @Override
304      public boolean delete(Path path, boolean recursive) throws IOException {
305       Path absolutePath = makeAbsolute(path);
306       INode inode = store.retrieveINode(absolutePath);
307       if (inode == null) {
308         return false;
309       }
310       if (inode.isFile()) {
311         store.deleteINode(absolutePath);
312         for (Block block: inode.getBlocks()) {
313           store.deleteBlock(block);
314         }
315       } else {
316         FileStatus[] contents = null; 
317         try {
318           contents = listStatus(absolutePath);
319         } catch(FileNotFoundException fnfe) {
320           return false;
321         }
322    
323         if ((contents.length !=0) && (!recursive)) {
324           throw new IOException("Directory " + path.toString() 
325               + " is not empty.");
326         }
327         for (FileStatus p:contents) {
328           if (!delete(p.getPath(), recursive)) {
329             return false;
330           }
331         }
332         store.deleteINode(absolutePath);
333       }
334       return true;
335      }
336      
337      /**
338       * FileStatus for S3 file systems. 
339       */
340      @Override
341      public FileStatus getFileStatus(Path f)  throws IOException {
342        INode inode = store.retrieveINode(makeAbsolute(f));
343        if (inode == null) {
344          throw new FileNotFoundException(f + ": No such file or directory.");
345        }
346        return new S3FileStatus(f.makeQualified(this), inode);
347      }
348      
349      @Override
350      public long getDefaultBlockSize() {
351        return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
352      }
353    
354      @Override
355      public String getCanonicalServiceName() {
356        // Does not support Token
357        return null;
358      }
359    
360      // diagnostic methods
361    
362      void dump() throws IOException {
363        store.dump();
364      }
365    
366      void purge() throws IOException {
367        store.purge();
368      }
369    
370      private static class S3FileStatus extends FileStatus {
371    
372        S3FileStatus(Path f, INode inode) throws IOException {
373          super(findLength(inode), inode.isDirectory(), 1,
374                findBlocksize(inode), 0, f);
375        }
376    
377        private static long findLength(INode inode) {
378          if (!inode.isDirectory()) {
379            long length = 0L;
380            for (Block block : inode.getBlocks()) {
381              length += block.getLength();
382            }
383            return length;
384          }
385          return 0;
386        }
387    
388        private static long findBlocksize(INode inode) {
389          final Block[] ret = inode.getBlocks();
390          return ret == null ? 0L : ret[0].getLength();
391        }
392      }
393    }