001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
041import org.apache.hadoop.io.retry.RetryPolicies;
042import org.apache.hadoop.io.retry.RetryPolicy;
043import org.apache.hadoop.io.retry.RetryProxy;
044import org.apache.hadoop.util.Progressable;
045
046/**
047 * A block-based {@link FileSystem} backed by
048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049 *
050 * @see NativeS3FileSystem
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Stable
054public class S3FileSystem extends FileSystem {
055
056  private URI uri;
057
058  private FileSystemStore store;
059
060  private Path workingDir;
061
062  public S3FileSystem() {
063    // set store in initialize()
064  }
065  
066  public S3FileSystem(FileSystemStore store) {
067    this.store = store;
068  }
069
070  /**
071   * Return the protocol scheme for the FileSystem.
072   *
073   * @return <code>s3</code>
074   */
075  @Override
076  public String getScheme() {
077    return "s3";
078  }
079
080  @Override
081  public URI getUri() {
082    return uri;
083  }
084
085  @Override
086  public void initialize(URI uri, Configuration conf) throws IOException {
087    super.initialize(uri, conf);
088    if (store == null) {
089      store = createDefaultStore(conf);
090    }
091    store.initialize(uri, conf);
092    setConf(conf);
093    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
094    this.workingDir =
095      new Path("/user", System.getProperty("user.name")).makeQualified(this);
096  }  
097
098  private static FileSystemStore createDefaultStore(Configuration conf) {
099    FileSystemStore store = new Jets3tFileSystemStore();
100    
101    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
102                                                                               conf.getInt("fs.s3.maxRetries", 4),
103                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
104    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
105      new HashMap<Class<? extends Exception>, RetryPolicy>();
106    exceptionToPolicyMap.put(IOException.class, basePolicy);
107    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
108    
109    RetryPolicy methodPolicy = RetryPolicies.retryByException(
110                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
111    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
112    methodNameToPolicyMap.put("storeBlock", methodPolicy);
113    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
114    
115    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
116                                               store, methodNameToPolicyMap);
117  }
118
119  @Override
120  public Path getWorkingDirectory() {
121    return workingDir;
122  }
123
124  @Override
125  public void setWorkingDirectory(Path dir) {
126    workingDir = makeAbsolute(dir);
127  }
128
129  private Path makeAbsolute(Path path) {
130    if (path.isAbsolute()) {
131      return path;
132    }
133    return new Path(workingDir, path);
134  }
135
136  /**
137   * @param permission Currently ignored.
138   */
139  @Override
140  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
141    Path absolutePath = makeAbsolute(path);
142    List<Path> paths = new ArrayList<Path>();
143    do {
144      paths.add(0, absolutePath);
145      absolutePath = absolutePath.getParent();
146    } while (absolutePath != null);
147    
148    boolean result = true;
149    for (Path p : paths) {
150      result &= mkdir(p);
151    }
152    return result;
153  }
154  
155  private boolean mkdir(Path path) throws IOException {
156    Path absolutePath = makeAbsolute(path);
157    INode inode = store.retrieveINode(absolutePath);
158    if (inode == null) {
159      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
160    } else if (inode.isFile()) {
161      throw new IOException(String.format(
162          "Can't make directory for path %s since it is a file.",
163          absolutePath));
164    }
165    return true;
166  }
167
168  @Override
169  public boolean isFile(Path path) throws IOException {
170    INode inode = store.retrieveINode(makeAbsolute(path));
171    if (inode == null) {
172      return false;
173    }
174    return inode.isFile();
175  }
176
177  private INode checkFile(Path path) throws IOException {
178    INode inode = store.retrieveINode(makeAbsolute(path));
179    if (inode == null) {
180      throw new IOException("No such file.");
181    }
182    if (inode.isDirectory()) {
183      throw new IOException("Path " + path + " is a directory.");
184    }
185    return inode;
186  }
187
188  @Override
189  public FileStatus[] listStatus(Path f) throws IOException {
190    Path absolutePath = makeAbsolute(f);
191    INode inode = store.retrieveINode(absolutePath);
192    if (inode == null) {
193      throw new FileNotFoundException("File " + f + " does not exist.");
194    }
195    if (inode.isFile()) {
196      return new FileStatus[] {
197        new S3FileStatus(f.makeQualified(this), inode)
198      };
199    }
200    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
201    for (Path p : store.listSubPaths(absolutePath)) {
202      ret.add(getFileStatus(p.makeQualified(this)));
203    }
204    return ret.toArray(new FileStatus[0]);
205  }
206
207  /** This optional operation is not yet supported. */
208  @Override
209  public FSDataOutputStream append(Path f, int bufferSize,
210      Progressable progress) throws IOException {
211    throw new IOException("Not supported");
212  }
213
214  /**
215   * @param permission Currently ignored.
216   */
217  @Override
218  public FSDataOutputStream create(Path file, FsPermission permission,
219      boolean overwrite, int bufferSize,
220      short replication, long blockSize, Progressable progress)
221    throws IOException {
222
223    INode inode = store.retrieveINode(makeAbsolute(file));
224    if (inode != null) {
225      if (overwrite) {
226        delete(file, true);
227      } else {
228        throw new FileAlreadyExistsException("File already exists: " + file);
229      }
230    } else {
231      Path parent = file.getParent();
232      if (parent != null) {
233        if (!mkdirs(parent)) {
234          throw new IOException("Mkdirs failed to create " + parent.toString());
235        }
236      }      
237    }
238    return new FSDataOutputStream
239        (new S3OutputStream(getConf(), store, makeAbsolute(file),
240                            blockSize, progress, bufferSize),
241         statistics);
242  }
243
244  @Override
245  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
246    INode inode = checkFile(path);
247    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
248                                                   statistics));
249  }
250
251  @Override
252  public boolean rename(Path src, Path dst) throws IOException {
253    Path absoluteSrc = makeAbsolute(src);
254    INode srcINode = store.retrieveINode(absoluteSrc);
255    if (srcINode == null) {
256      // src path doesn't exist
257      return false; 
258    }
259    Path absoluteDst = makeAbsolute(dst);
260    INode dstINode = store.retrieveINode(absoluteDst);
261    if (dstINode != null && dstINode.isDirectory()) {
262      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
263      dstINode = store.retrieveINode(absoluteDst);
264    }
265    if (dstINode != null) {
266      // dst path already exists - can't overwrite
267      return false;
268    }
269    Path dstParent = absoluteDst.getParent();
270    if (dstParent != null) {
271      INode dstParentINode = store.retrieveINode(dstParent);
272      if (dstParentINode == null || dstParentINode.isFile()) {
273        // dst parent doesn't exist or is a file
274        return false;
275      }
276    }
277    return renameRecursive(absoluteSrc, absoluteDst);
278  }
279  
280  private boolean renameRecursive(Path src, Path dst) throws IOException {
281    INode srcINode = store.retrieveINode(src);
282    store.storeINode(dst, srcINode);
283    store.deleteINode(src);
284    if (srcINode.isDirectory()) {
285      for (Path oldSrc : store.listDeepSubPaths(src)) {
286        INode inode = store.retrieveINode(oldSrc);
287        if (inode == null) {
288          return false;
289        }
290        String oldSrcPath = oldSrc.toUri().getPath();
291        String srcPath = src.toUri().getPath();
292        String dstPath = dst.toUri().getPath();
293        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
294        store.storeINode(newDst, inode);
295        store.deleteINode(oldSrc);
296      }
297    }
298    return true;
299  }
300
301  @Override
302  public boolean delete(Path path, boolean recursive) throws IOException {
303   Path absolutePath = makeAbsolute(path);
304   INode inode = store.retrieveINode(absolutePath);
305   if (inode == null) {
306     return false;
307   }
308   if (inode.isFile()) {
309     store.deleteINode(absolutePath);
310     for (Block block: inode.getBlocks()) {
311       store.deleteBlock(block);
312     }
313   } else {
314     FileStatus[] contents = null; 
315     try {
316       contents = listStatus(absolutePath);
317     } catch(FileNotFoundException fnfe) {
318       return false;
319     }
320
321     if ((contents.length !=0) && (!recursive)) {
322       throw new IOException("Directory " + path.toString() 
323           + " is not empty.");
324     }
325     for (FileStatus p:contents) {
326       if (!delete(p.getPath(), recursive)) {
327         return false;
328       }
329     }
330     store.deleteINode(absolutePath);
331   }
332   return true;
333  }
334  
335  /**
336   * FileStatus for S3 file systems. 
337   */
338  @Override
339  public FileStatus getFileStatus(Path f)  throws IOException {
340    INode inode = store.retrieveINode(makeAbsolute(f));
341    if (inode == null) {
342      throw new FileNotFoundException(f + ": No such file or directory.");
343    }
344    return new S3FileStatus(f.makeQualified(this), inode);
345  }
346  
347  @Override
348  public long getDefaultBlockSize() {
349    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
350  }
351
352  @Override
353  public String getCanonicalServiceName() {
354    // Does not support Token
355    return null;
356  }
357
358  // diagnostic methods
359
360  void dump() throws IOException {
361    store.dump();
362  }
363
364  void purge() throws IOException {
365    store.purge();
366  }
367
368  private static class S3FileStatus extends FileStatus {
369
370    S3FileStatus(Path f, INode inode) throws IOException {
371      super(findLength(inode), inode.isDirectory(), 1,
372            findBlocksize(inode), 0, f);
373    }
374
375    private static long findLength(INode inode) {
376      if (!inode.isDirectory()) {
377        long length = 0L;
378        for (Block block : inode.getBlocks()) {
379          length += block.getLength();
380        }
381        return length;
382      }
383      return 0;
384    }
385
386    private static long findBlocksize(INode inode) {
387      final Block[] ret = inode.getBlocks();
388      return ret == null ? 0L : ret[0].getLength();
389    }
390  }
391}