001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.fs.permission.FsPermission;
039import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
040import org.apache.hadoop.io.retry.RetryPolicies;
041import org.apache.hadoop.io.retry.RetryPolicy;
042import org.apache.hadoop.io.retry.RetryProxy;
043import org.apache.hadoop.util.Progressable;
044
045/**
046 * <p>
047 * A block-based {@link FileSystem} backed by
048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049 * </p>
050 * @see NativeS3FileSystem
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Stable
054public class S3FileSystem extends FileSystem {
055
056  private URI uri;
057
058  private FileSystemStore store;
059
060  private Path workingDir;
061
062  public S3FileSystem() {
063    // set store in initialize()
064  }
065  
066  public S3FileSystem(FileSystemStore store) {
067    this.store = store;
068  }
069
070  @Override
071  public URI getUri() {
072    return uri;
073  }
074
075  @Override
076  public void initialize(URI uri, Configuration conf) throws IOException {
077    super.initialize(uri, conf);
078    if (store == null) {
079      store = createDefaultStore(conf);
080    }
081    store.initialize(uri, conf);
082    setConf(conf);
083    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
084    this.workingDir =
085      new Path("/user", System.getProperty("user.name")).makeQualified(this);
086  }  
087
088  private static FileSystemStore createDefaultStore(Configuration conf) {
089    FileSystemStore store = new Jets3tFileSystemStore();
090    
091    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
092                                                                               conf.getInt("fs.s3.maxRetries", 4),
093                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
094    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
095      new HashMap<Class<? extends Exception>, RetryPolicy>();
096    exceptionToPolicyMap.put(IOException.class, basePolicy);
097    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
098    
099    RetryPolicy methodPolicy = RetryPolicies.retryByException(
100                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
101    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
102    methodNameToPolicyMap.put("storeBlock", methodPolicy);
103    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
104    
105    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
106                                               store, methodNameToPolicyMap);
107  }
108
109  @Override
110  public Path getWorkingDirectory() {
111    return workingDir;
112  }
113
114  @Override
115  public void setWorkingDirectory(Path dir) {
116    workingDir = makeAbsolute(dir);
117  }
118
119  private Path makeAbsolute(Path path) {
120    if (path.isAbsolute()) {
121      return path;
122    }
123    return new Path(workingDir, path);
124  }
125
126  /**
127   * @param permission Currently ignored.
128   */
129  @Override
130  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
131    Path absolutePath = makeAbsolute(path);
132    List<Path> paths = new ArrayList<Path>();
133    do {
134      paths.add(0, absolutePath);
135      absolutePath = absolutePath.getParent();
136    } while (absolutePath != null);
137    
138    boolean result = true;
139    for (Path p : paths) {
140      result &= mkdir(p);
141    }
142    return result;
143  }
144  
145  private boolean mkdir(Path path) throws IOException {
146    Path absolutePath = makeAbsolute(path);
147    INode inode = store.retrieveINode(absolutePath);
148    if (inode == null) {
149      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
150    } else if (inode.isFile()) {
151      throw new IOException(String.format(
152          "Can't make directory for path %s since it is a file.",
153          absolutePath));
154    }
155    return true;
156  }
157
158  @Override
159  public boolean isFile(Path path) throws IOException {
160    INode inode = store.retrieveINode(makeAbsolute(path));
161    if (inode == null) {
162      return false;
163    }
164    return inode.isFile();
165  }
166
167  private INode checkFile(Path path) throws IOException {
168    INode inode = store.retrieveINode(makeAbsolute(path));
169    if (inode == null) {
170      throw new IOException("No such file.");
171    }
172    if (inode.isDirectory()) {
173      throw new IOException("Path " + path + " is a directory.");
174    }
175    return inode;
176  }
177
178  @Override
179  public FileStatus[] listStatus(Path f) throws IOException {
180    Path absolutePath = makeAbsolute(f);
181    INode inode = store.retrieveINode(absolutePath);
182    if (inode == null) {
183      throw new FileNotFoundException("File " + f + " does not exist.");
184    }
185    if (inode.isFile()) {
186      return new FileStatus[] {
187        new S3FileStatus(f.makeQualified(this), inode)
188      };
189    }
190    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
191    for (Path p : store.listSubPaths(absolutePath)) {
192      ret.add(getFileStatus(p.makeQualified(this)));
193    }
194    return ret.toArray(new FileStatus[0]);
195  }
196
197  /** This optional operation is not yet supported. */
198  public FSDataOutputStream append(Path f, int bufferSize,
199      Progressable progress) throws IOException {
200    throw new IOException("Not supported");
201  }
202
203  /**
204   * @param permission Currently ignored.
205   */
206  @Override
207  public FSDataOutputStream create(Path file, FsPermission permission,
208      boolean overwrite, int bufferSize,
209      short replication, long blockSize, Progressable progress)
210    throws IOException {
211
212    INode inode = store.retrieveINode(makeAbsolute(file));
213    if (inode != null) {
214      if (overwrite) {
215        delete(file, true);
216      } else {
217        throw new IOException("File already exists: " + file);
218      }
219    } else {
220      Path parent = file.getParent();
221      if (parent != null) {
222        if (!mkdirs(parent)) {
223          throw new IOException("Mkdirs failed to create " + parent.toString());
224        }
225      }      
226    }
227    return new FSDataOutputStream
228        (new S3OutputStream(getConf(), store, makeAbsolute(file),
229                            blockSize, progress, bufferSize),
230         statistics);
231  }
232
233  @Override
234  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
235    INode inode = checkFile(path);
236    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
237                                                   statistics));
238  }
239
240  @Override
241  public boolean rename(Path src, Path dst) throws IOException {
242    Path absoluteSrc = makeAbsolute(src);
243    INode srcINode = store.retrieveINode(absoluteSrc);
244    if (srcINode == null) {
245      // src path doesn't exist
246      return false; 
247    }
248    Path absoluteDst = makeAbsolute(dst);
249    INode dstINode = store.retrieveINode(absoluteDst);
250    if (dstINode != null && dstINode.isDirectory()) {
251      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
252      dstINode = store.retrieveINode(absoluteDst);
253    }
254    if (dstINode != null) {
255      // dst path already exists - can't overwrite
256      return false;
257    }
258    Path dstParent = absoluteDst.getParent();
259    if (dstParent != null) {
260      INode dstParentINode = store.retrieveINode(dstParent);
261      if (dstParentINode == null || dstParentINode.isFile()) {
262        // dst parent doesn't exist or is a file
263        return false;
264      }
265    }
266    return renameRecursive(absoluteSrc, absoluteDst);
267  }
268  
269  private boolean renameRecursive(Path src, Path dst) throws IOException {
270    INode srcINode = store.retrieveINode(src);
271    store.storeINode(dst, srcINode);
272    store.deleteINode(src);
273    if (srcINode.isDirectory()) {
274      for (Path oldSrc : store.listDeepSubPaths(src)) {
275        INode inode = store.retrieveINode(oldSrc);
276        if (inode == null) {
277          return false;
278        }
279        String oldSrcPath = oldSrc.toUri().getPath();
280        String srcPath = src.toUri().getPath();
281        String dstPath = dst.toUri().getPath();
282        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
283        store.storeINode(newDst, inode);
284        store.deleteINode(oldSrc);
285      }
286    }
287    return true;
288  }
289
290  public boolean delete(Path path, boolean recursive) throws IOException {
291   Path absolutePath = makeAbsolute(path);
292   INode inode = store.retrieveINode(absolutePath);
293   if (inode == null) {
294     return false;
295   }
296   if (inode.isFile()) {
297     store.deleteINode(absolutePath);
298     for (Block block: inode.getBlocks()) {
299       store.deleteBlock(block);
300     }
301   } else {
302     FileStatus[] contents = null; 
303     try {
304       contents = listStatus(absolutePath);
305     } catch(FileNotFoundException fnfe) {
306       return false;
307     }
308
309     if ((contents.length !=0) && (!recursive)) {
310       throw new IOException("Directory " + path.toString() 
311           + " is not empty.");
312     }
313     for (FileStatus p:contents) {
314       if (!delete(p.getPath(), recursive)) {
315         return false;
316       }
317     }
318     store.deleteINode(absolutePath);
319   }
320   return true;
321  }
322  
323  /**
324   * FileStatus for S3 file systems. 
325   */
326  @Override
327  public FileStatus getFileStatus(Path f)  throws IOException {
328    INode inode = store.retrieveINode(makeAbsolute(f));
329    if (inode == null) {
330      throw new FileNotFoundException(f + ": No such file or directory.");
331    }
332    return new S3FileStatus(f.makeQualified(this), inode);
333  }
334  
335  @Override
336  public long getDefaultBlockSize() {
337    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
338  }
339
340  // diagnostic methods
341
342  void dump() throws IOException {
343    store.dump();
344  }
345
346  void purge() throws IOException {
347    store.purge();
348  }
349
350  private static class S3FileStatus extends FileStatus {
351
352    S3FileStatus(Path f, INode inode) throws IOException {
353      super(findLength(inode), inode.isDirectory(), 1,
354            findBlocksize(inode), 0, f);
355    }
356
357    private static long findLength(INode inode) {
358      if (!inode.isDirectory()) {
359        long length = 0L;
360        for (Block block : inode.getBlocks()) {
361          length += block.getLength();
362        }
363        return length;
364      }
365      return 0;
366    }
367
368    private static long findBlocksize(INode inode) {
369      final Block[] ret = inode.getBlocks();
370      return ret == null ? 0L : ret[0].getLength();
371    }
372  }
373}