001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileAlreadyExistsException;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.ParentNotDirectoryException;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.permission.FsPermission;
042import org.apache.hadoop.fs.s3a.S3AFileSystem;
043import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
044import org.apache.hadoop.fs.s3native.S3xLoginHelper;
045import org.apache.hadoop.io.retry.RetryPolicies;
046import org.apache.hadoop.io.retry.RetryPolicy;
047import org.apache.hadoop.io.retry.RetryProxy;
048import org.apache.hadoop.util.Progressable;
049
050/**
051 * A block-based {@link FileSystem} backed by
052 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
053 *
054 * @see NativeS3FileSystem
055 * @deprecated Use {@link NativeS3FileSystem} and {@link S3AFileSystem} instead.
056 */
057@InterfaceAudience.Public
058@InterfaceStability.Stable
059@Deprecated
060public class S3FileSystem extends FileSystem {
061
062  private static final AtomicBoolean hasWarnedDeprecation
063      = new AtomicBoolean(false);
064
065  private URI uri;
066
067  private FileSystemStore store;
068
069  private Path workingDir;
070
071  public S3FileSystem() {
072    // set store in initialize()
073  }
074
075  public S3FileSystem(FileSystemStore store) {
076    this.store = store;
077  }
078
079  /**
080   * This is to warn the first time in a JVM that an S3FileSystem is created.
081   */
082  private static void warnDeprecation() {
083    if (!hasWarnedDeprecation.getAndSet(true)) {
084      LOG.warn("S3FileSystem is deprecated and will be removed in " +
085          "future releases. Use NativeS3FileSystem or S3AFileSystem instead.");
086    }
087  }
088
089  /**
090   * Return the protocol scheme for the FileSystem.
091   *
092   * @return <code>s3</code>
093   */
094  @Override
095  public String getScheme() {
096    return "s3";
097  }
098
099  @Override
100  public URI getUri() {
101    return uri;
102  }
103
104  @Override
105  public void initialize(URI uri, Configuration conf) throws IOException {
106    super.initialize(uri, conf);
107    warnDeprecation();
108    if (store == null) {
109      store = createDefaultStore(conf);
110    }
111    store.initialize(uri, conf);
112    setConf(conf);
113    this.uri = S3xLoginHelper.buildFSURI(uri);
114    this.workingDir =
115      new Path("/user", System.getProperty("user.name")).makeQualified(this);
116  }
117
118  private static FileSystemStore createDefaultStore(Configuration conf) {
119    FileSystemStore store = new Jets3tFileSystemStore();
120
121    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
122                                                                               conf.getInt("fs.s3.maxRetries", 4),
123                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
124    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
125      new HashMap<Class<? extends Exception>, RetryPolicy>();
126    exceptionToPolicyMap.put(IOException.class, basePolicy);
127    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
128
129    RetryPolicy methodPolicy = RetryPolicies.retryByException(
130                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
131    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
132    methodNameToPolicyMap.put("storeBlock", methodPolicy);
133    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
134
135    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
136                                               store, methodNameToPolicyMap);
137  }
138
139  @Override
140  public Path getWorkingDirectory() {
141    return workingDir;
142  }
143
144  @Override
145  public void setWorkingDirectory(Path dir) {
146    workingDir = makeAbsolute(dir);
147  }
148
149  private Path makeAbsolute(Path path) {
150    if (path.isAbsolute()) {
151      return path;
152    }
153    return new Path(workingDir, path);
154  }
155
156  /**
157   * Check that a Path belongs to this FileSystem.
158   * Unlike the superclass, this version does not look at authority,
159   * only hostnames.
160   * @param path to check
161   * @throws IllegalArgumentException if there is an FS mismatch
162   */
163  @Override
164  protected void checkPath(Path path) {
165    S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
166  }
167
168  @Override
169  protected URI canonicalizeUri(URI rawUri) {
170    return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
171  }
172
173  /**
174   * @param permission Currently ignored.
175   */
176  @Override
177  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
178    Path absolutePath = makeAbsolute(path);
179    List<Path> paths = new ArrayList<Path>();
180    do {
181      paths.add(0, absolutePath);
182      absolutePath = absolutePath.getParent();
183    } while (absolutePath != null);
184
185    boolean result = true;
186    for (int i = 0; i < paths.size(); i++) {
187      Path p = paths.get(i);
188      try {
189        result &= mkdir(p);
190      } catch(FileAlreadyExistsException e) {
191        if (i + 1 < paths.size()) {
192          throw new ParentNotDirectoryException(e.getMessage());
193        }
194        throw e;
195      }
196    }
197    return result;
198  }
199
200  private boolean mkdir(Path path) throws IOException {
201    Path absolutePath = makeAbsolute(path);
202    INode inode = store.retrieveINode(absolutePath);
203    if (inode == null) {
204      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
205    } else if (inode.isFile()) {
206      throw new FileAlreadyExistsException(String.format(
207          "Can't make directory for path %s since it is a file.",
208          absolutePath));
209    }
210    return true;
211  }
212
213  @Override
214  public boolean isFile(Path path) throws IOException {
215    INode inode = store.retrieveINode(makeAbsolute(path));
216    if (inode == null) {
217      return false;
218    }
219    return inode.isFile();
220  }
221
222  private INode checkFile(Path path) throws IOException {
223    INode inode = store.retrieveINode(makeAbsolute(path));
224    String message = String.format("No such file: '%s'", path.toString());
225    if (inode == null) {
226      throw new FileNotFoundException(message + " does not exist");
227    }
228    if (inode.isDirectory()) {
229      throw new FileNotFoundException(message + " is a directory");
230    }
231    return inode;
232  }
233
234  @Override
235  public FileStatus[] listStatus(Path f) throws IOException {
236    Path absolutePath = makeAbsolute(f);
237    INode inode = store.retrieveINode(absolutePath);
238    if (inode == null) {
239      throw new FileNotFoundException("File " + f + " does not exist.");
240    }
241    if (inode.isFile()) {
242      return new FileStatus[] {
243        new S3FileStatus(f.makeQualified(this), inode)
244      };
245    }
246    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
247    for (Path p : store.listSubPaths(absolutePath)) {
248      ret.add(getFileStatus(p.makeQualified(this)));
249    }
250    return ret.toArray(new FileStatus[0]);
251  }
252
253  /** This optional operation is not yet supported. */
254  @Override
255  public FSDataOutputStream append(Path f, int bufferSize,
256      Progressable progress) throws IOException {
257    throw new IOException("Not supported");
258  }
259
260  /**
261   * @param permission Currently ignored.
262   */
263  @Override
264  public FSDataOutputStream create(Path file, FsPermission permission,
265      boolean overwrite, int bufferSize,
266      short replication, long blockSize, Progressable progress)
267    throws IOException {
268
269    INode inode = store.retrieveINode(makeAbsolute(file));
270    if (inode != null) {
271      if (overwrite && !inode.isDirectory()) {
272        delete(file, true);
273      } else {
274        String message = String.format("File already exists: '%s'", file);
275        if (inode.isDirectory()) {
276          message = message + " is a directory";
277        }
278        throw new FileAlreadyExistsException(message);
279      }
280    } else {
281      Path parent = file.getParent();
282      if (parent != null) {
283        if (!mkdirs(parent)) {
284          throw new IOException("Mkdirs failed to create " + parent.toString());
285        }
286      }
287    }
288    return new FSDataOutputStream
289        (new S3OutputStream(getConf(), store, makeAbsolute(file),
290                            blockSize, progress, bufferSize),
291         statistics);
292  }
293
294  @Override
295  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
296    INode inode = checkFile(path);
297    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
298                                                   statistics));
299  }
300
301  @Override
302  public boolean rename(Path src, Path dst) throws IOException {
303    Path absoluteSrc = makeAbsolute(src);
304    INode srcINode = store.retrieveINode(absoluteSrc);
305    if (srcINode == null) {
306      // src path doesn't exist
307      return false;
308    }
309    Path absoluteDst = makeAbsolute(dst);
310    INode dstINode = store.retrieveINode(absoluteDst);
311    if (dstINode != null && dstINode.isDirectory()) {
312      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
313      dstINode = store.retrieveINode(absoluteDst);
314    }
315    if (dstINode != null) {
316      // dst path already exists - can't overwrite
317      return false;
318    }
319    Path dstParent = absoluteDst.getParent();
320    if (dstParent != null) {
321      INode dstParentINode = store.retrieveINode(dstParent);
322      if (dstParentINode == null || dstParentINode.isFile()) {
323        // dst parent doesn't exist or is a file
324        return false;
325      }
326    }
327    return renameRecursive(absoluteSrc, absoluteDst);
328  }
329  
330  private boolean renameRecursive(Path src, Path dst) throws IOException {
331    INode srcINode = store.retrieveINode(src);
332    store.storeINode(dst, srcINode);
333    store.deleteINode(src);
334    if (srcINode.isDirectory()) {
335      for (Path oldSrc : store.listDeepSubPaths(src)) {
336        INode inode = store.retrieveINode(oldSrc);
337        if (inode == null) {
338          return false;
339        }
340        String oldSrcPath = oldSrc.toUri().getPath();
341        String srcPath = src.toUri().getPath();
342        String dstPath = dst.toUri().getPath();
343        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
344        store.storeINode(newDst, inode);
345        store.deleteINode(oldSrc);
346      }
347    }
348    return true;
349  }
350
351  @Override
352  public boolean delete(Path path, boolean recursive) throws IOException {
353   Path absolutePath = makeAbsolute(path);
354   INode inode = store.retrieveINode(absolutePath);
355   if (inode == null) {
356     return false;
357   }
358   if (inode.isFile()) {
359     store.deleteINode(absolutePath);
360     for (Block block: inode.getBlocks()) {
361       store.deleteBlock(block);
362     }
363   } else {
364     FileStatus[] contents = null;
365     try {
366       contents = listStatus(absolutePath);
367     } catch(FileNotFoundException fnfe) {
368       return false;
369     }
370
371     if ((contents.length !=0) && (!recursive)) {
372       throw new IOException("Directory " + path.toString()
373           + " is not empty.");
374     }
375     for (FileStatus p:contents) {
376       if (!delete(p.getPath(), recursive)) {
377         return false;
378       }
379     }
380     store.deleteINode(absolutePath);
381   }
382   return true;
383  }
384
385  /**
386   * FileStatus for S3 file systems.
387   */
388  @Override
389  public FileStatus getFileStatus(Path f)  throws IOException {
390    INode inode = store.retrieveINode(makeAbsolute(f));
391    if (inode == null) {
392      throw new FileNotFoundException(f + ": No such file or directory.");
393    }
394    return new S3FileStatus(f.makeQualified(this), inode);
395  }
396
397  @Override
398  public long getDefaultBlockSize() {
399    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
400  }
401
402  @Override
403  public String getCanonicalServiceName() {
404    // Does not support Token
405    return null;
406  }
407
408  // diagnostic methods
409
410  void dump() throws IOException {
411    store.dump();
412  }
413
414  void purge() throws IOException {
415    store.purge();
416  }
417
418  private static class S3FileStatus extends FileStatus {
419
420    S3FileStatus(Path f, INode inode) throws IOException {
421      super(findLength(inode), inode.isDirectory(), 1,
422            findBlocksize(inode), 0, f);
423    }
424
425    private static long findLength(INode inode) {
426      if (!inode.isDirectory()) {
427        long length = 0L;
428        for (Block block : inode.getBlocks()) {
429          length += block.getLength();
430        }
431        return length;
432      }
433      return 0;
434    }
435
436    private static long findBlocksize(INode inode) {
437      final Block[] ret = inode.getBlocks();
438      return ret == null ? 0L : ret[0].getLength();
439    }
440  }
441}