001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
041import org.apache.hadoop.io.retry.RetryPolicies;
042import org.apache.hadoop.io.retry.RetryPolicy;
043import org.apache.hadoop.io.retry.RetryProxy;
044import org.apache.hadoop.util.Progressable;
045
046/**
047 * <p>
048 * A block-based {@link FileSystem} backed by
049 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
050 * </p>
051 * @see NativeS3FileSystem
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Stable
055public class S3FileSystem extends FileSystem {
056
057  private URI uri;
058
059  private FileSystemStore store;
060
061  private Path workingDir;
062
063  public S3FileSystem() {
064    // set store in initialize()
065  }
066  
067  public S3FileSystem(FileSystemStore store) {
068    this.store = store;
069  }
070
071  /**
072   * Return the protocol scheme for the FileSystem.
073   * <p/>
074   *
075   * @return <code>s3</code>
076   */
077  @Override
078  public String getScheme() {
079    return "s3";
080  }
081
082  @Override
083  public URI getUri() {
084    return uri;
085  }
086
087  @Override
088  public void initialize(URI uri, Configuration conf) throws IOException {
089    super.initialize(uri, conf);
090    if (store == null) {
091      store = createDefaultStore(conf);
092    }
093    store.initialize(uri, conf);
094    setConf(conf);
095    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
096    this.workingDir =
097      new Path("/user", System.getProperty("user.name")).makeQualified(this);
098  }  
099
100  private static FileSystemStore createDefaultStore(Configuration conf) {
101    FileSystemStore store = new Jets3tFileSystemStore();
102    
103    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
104                                                                               conf.getInt("fs.s3.maxRetries", 4),
105                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
106    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
107      new HashMap<Class<? extends Exception>, RetryPolicy>();
108    exceptionToPolicyMap.put(IOException.class, basePolicy);
109    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
110    
111    RetryPolicy methodPolicy = RetryPolicies.retryByException(
112                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
113    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
114    methodNameToPolicyMap.put("storeBlock", methodPolicy);
115    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
116    
117    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
118                                               store, methodNameToPolicyMap);
119  }
120
121  @Override
122  public Path getWorkingDirectory() {
123    return workingDir;
124  }
125
126  @Override
127  public void setWorkingDirectory(Path dir) {
128    workingDir = makeAbsolute(dir);
129  }
130
131  private Path makeAbsolute(Path path) {
132    if (path.isAbsolute()) {
133      return path;
134    }
135    return new Path(workingDir, path);
136  }
137
138  /**
139   * @param permission Currently ignored.
140   */
141  @Override
142  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
143    Path absolutePath = makeAbsolute(path);
144    List<Path> paths = new ArrayList<Path>();
145    do {
146      paths.add(0, absolutePath);
147      absolutePath = absolutePath.getParent();
148    } while (absolutePath != null);
149    
150    boolean result = true;
151    for (Path p : paths) {
152      result &= mkdir(p);
153    }
154    return result;
155  }
156  
157  private boolean mkdir(Path path) throws IOException {
158    Path absolutePath = makeAbsolute(path);
159    INode inode = store.retrieveINode(absolutePath);
160    if (inode == null) {
161      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
162    } else if (inode.isFile()) {
163      throw new IOException(String.format(
164          "Can't make directory for path %s since it is a file.",
165          absolutePath));
166    }
167    return true;
168  }
169
170  @Override
171  public boolean isFile(Path path) throws IOException {
172    INode inode = store.retrieveINode(makeAbsolute(path));
173    if (inode == null) {
174      return false;
175    }
176    return inode.isFile();
177  }
178
179  private INode checkFile(Path path) throws IOException {
180    INode inode = store.retrieveINode(makeAbsolute(path));
181    if (inode == null) {
182      throw new IOException("No such file.");
183    }
184    if (inode.isDirectory()) {
185      throw new IOException("Path " + path + " is a directory.");
186    }
187    return inode;
188  }
189
190  @Override
191  public FileStatus[] listStatus(Path f) throws IOException {
192    Path absolutePath = makeAbsolute(f);
193    INode inode = store.retrieveINode(absolutePath);
194    if (inode == null) {
195      throw new FileNotFoundException("File " + f + " does not exist.");
196    }
197    if (inode.isFile()) {
198      return new FileStatus[] {
199        new S3FileStatus(f.makeQualified(this), inode)
200      };
201    }
202    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
203    for (Path p : store.listSubPaths(absolutePath)) {
204      ret.add(getFileStatus(p.makeQualified(this)));
205    }
206    return ret.toArray(new FileStatus[0]);
207  }
208
209  /** This optional operation is not yet supported. */
210  @Override
211  public FSDataOutputStream append(Path f, int bufferSize,
212      Progressable progress) throws IOException {
213    throw new IOException("Not supported");
214  }
215
216  /**
217   * @param permission Currently ignored.
218   */
219  @Override
220  public FSDataOutputStream create(Path file, FsPermission permission,
221      boolean overwrite, int bufferSize,
222      short replication, long blockSize, Progressable progress)
223    throws IOException {
224
225    INode inode = store.retrieveINode(makeAbsolute(file));
226    if (inode != null) {
227      if (overwrite) {
228        delete(file, true);
229      } else {
230        throw new FileAlreadyExistsException("File already exists: " + file);
231      }
232    } else {
233      Path parent = file.getParent();
234      if (parent != null) {
235        if (!mkdirs(parent)) {
236          throw new IOException("Mkdirs failed to create " + parent.toString());
237        }
238      }      
239    }
240    return new FSDataOutputStream
241        (new S3OutputStream(getConf(), store, makeAbsolute(file),
242                            blockSize, progress, bufferSize),
243         statistics);
244  }
245
246  @Override
247  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
248    INode inode = checkFile(path);
249    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
250                                                   statistics));
251  }
252
253  @Override
254  public boolean rename(Path src, Path dst) throws IOException {
255    Path absoluteSrc = makeAbsolute(src);
256    INode srcINode = store.retrieveINode(absoluteSrc);
257    if (srcINode == null) {
258      // src path doesn't exist
259      return false; 
260    }
261    Path absoluteDst = makeAbsolute(dst);
262    INode dstINode = store.retrieveINode(absoluteDst);
263    if (dstINode != null && dstINode.isDirectory()) {
264      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
265      dstINode = store.retrieveINode(absoluteDst);
266    }
267    if (dstINode != null) {
268      // dst path already exists - can't overwrite
269      return false;
270    }
271    Path dstParent = absoluteDst.getParent();
272    if (dstParent != null) {
273      INode dstParentINode = store.retrieveINode(dstParent);
274      if (dstParentINode == null || dstParentINode.isFile()) {
275        // dst parent doesn't exist or is a file
276        return false;
277      }
278    }
279    return renameRecursive(absoluteSrc, absoluteDst);
280  }
281  
282  private boolean renameRecursive(Path src, Path dst) throws IOException {
283    INode srcINode = store.retrieveINode(src);
284    store.storeINode(dst, srcINode);
285    store.deleteINode(src);
286    if (srcINode.isDirectory()) {
287      for (Path oldSrc : store.listDeepSubPaths(src)) {
288        INode inode = store.retrieveINode(oldSrc);
289        if (inode == null) {
290          return false;
291        }
292        String oldSrcPath = oldSrc.toUri().getPath();
293        String srcPath = src.toUri().getPath();
294        String dstPath = dst.toUri().getPath();
295        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
296        store.storeINode(newDst, inode);
297        store.deleteINode(oldSrc);
298      }
299    }
300    return true;
301  }
302
303  @Override
304  public boolean delete(Path path, boolean recursive) throws IOException {
305   Path absolutePath = makeAbsolute(path);
306   INode inode = store.retrieveINode(absolutePath);
307   if (inode == null) {
308     return false;
309   }
310   if (inode.isFile()) {
311     store.deleteINode(absolutePath);
312     for (Block block: inode.getBlocks()) {
313       store.deleteBlock(block);
314     }
315   } else {
316     FileStatus[] contents = null; 
317     try {
318       contents = listStatus(absolutePath);
319     } catch(FileNotFoundException fnfe) {
320       return false;
321     }
322
323     if ((contents.length !=0) && (!recursive)) {
324       throw new IOException("Directory " + path.toString() 
325           + " is not empty.");
326     }
327     for (FileStatus p:contents) {
328       if (!delete(p.getPath(), recursive)) {
329         return false;
330       }
331     }
332     store.deleteINode(absolutePath);
333   }
334   return true;
335  }
336  
337  /**
338   * FileStatus for S3 file systems. 
339   */
340  @Override
341  public FileStatus getFileStatus(Path f)  throws IOException {
342    INode inode = store.retrieveINode(makeAbsolute(f));
343    if (inode == null) {
344      throw new FileNotFoundException(f + ": No such file or directory.");
345    }
346    return new S3FileStatus(f.makeQualified(this), inode);
347  }
348  
349  @Override
350  public long getDefaultBlockSize() {
351    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
352  }
353
354  @Override
355  public String getCanonicalServiceName() {
356    // Does not support Token
357    return null;
358  }
359
360  // diagnostic methods
361
362  void dump() throws IOException {
363    store.dump();
364  }
365
366  void purge() throws IOException {
367    store.purge();
368  }
369
370  private static class S3FileStatus extends FileStatus {
371
372    S3FileStatus(Path f, INode inode) throws IOException {
373      super(findLength(inode), inode.isDirectory(), 1,
374            findBlocksize(inode), 0, f);
375    }
376
377    private static long findLength(INode inode) {
378      if (!inode.isDirectory()) {
379        long length = 0L;
380        for (Block block : inode.getBlocks()) {
381          length += block.getLength();
382        }
383        return length;
384      }
385      return 0;
386    }
387
388    private static long findBlocksize(INode inode) {
389      final Block[] ret = inode.getBlocks();
390      return ret == null ? 0L : ret[0].getLength();
391    }
392  }
393}