001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.File;
023import java.io.FileNotFoundException;
024import java.io.FileOutputStream;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.URI;
029import java.security.DigestOutputStream;
030import java.security.MessageDigest;
031import java.security.NoSuchAlgorithmException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Set;
037import java.util.TreeSet;
038import java.util.concurrent.TimeUnit;
039
040import org.apache.commons.logging.Log;
041import org.apache.commons.logging.LogFactory;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSInputStream;
049import org.apache.hadoop.fs.FileStatus;
050import org.apache.hadoop.fs.FileSystem;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.permission.FsPermission;
053import org.apache.hadoop.fs.s3.S3Exception;
054import org.apache.hadoop.io.retry.RetryPolicies;
055import org.apache.hadoop.io.retry.RetryPolicy;
056import org.apache.hadoop.io.retry.RetryProxy;
057import org.apache.hadoop.util.Progressable;
058
059/**
060 * <p>
061 * A {@link FileSystem} for reading and writing files stored on
062 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
063 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
064 * stores files on S3 in their
065 * native form so they can be read by other S3 tools.
066 *
067 * A note about directories. S3 of course has no "native" support for them.
068 * The idiom we choose then is: for any directory created by this class,
069 * we use an empty object "#{dirpath}_$folder$" as a marker.
070 * Further, to interoperate with other S3 tools, we also accept the following:
071 *  - an object "#{dirpath}/' denoting a directory marker
072 *  - if there exists any objects with the prefix "#{dirpath}/", then the
073 *    directory is said to exist
074 *  - if both a file with the name of a directory and a marker for that
075 *    directory exists, then the *file masks the directory*, and the directory
076 *    is never returned.
077 * </p>
078 * @see org.apache.hadoop.fs.s3.S3FileSystem
079 */
080@InterfaceAudience.Public
081@InterfaceStability.Stable
082public class NativeS3FileSystem extends FileSystem {
083  
084  public static final Log LOG = 
085    LogFactory.getLog(NativeS3FileSystem.class);
086  
087  private static final String FOLDER_SUFFIX = "_$folder$";
088  static final String PATH_DELIMITER = Path.SEPARATOR;
089  private static final int S3_MAX_LISTING_LENGTH = 1000;
090  
091  static class NativeS3FsInputStream extends FSInputStream {
092    
093    private NativeFileSystemStore store;
094    private Statistics statistics;
095    private InputStream in;
096    private final String key;
097    private long pos = 0;
098    
099    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
100      this.store = store;
101      this.statistics = statistics;
102      this.in = in;
103      this.key = key;
104    }
105    
106    @Override
107    public synchronized int read() throws IOException {
108      int result = -1;
109      try {
110        result = in.read();
111      } catch (IOException e) {
112        LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
113        seek(pos);
114        result = in.read();
115      } 
116      if (result != -1) {
117        pos++;
118      }
119      if (statistics != null && result != -1) {
120        statistics.incrementBytesRead(1);
121      }
122      return result;
123    }
124    @Override
125    public synchronized int read(byte[] b, int off, int len)
126      throws IOException {
127      
128      int result = -1;
129      try {
130        result = in.read(b, off, len);
131      } catch (IOException e) {
132        LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
133        seek(pos);
134        result = in.read(b, off, len);
135      }
136      if (result > 0) {
137        pos += result;
138      }
139      if (statistics != null && result > 0) {
140        statistics.incrementBytesRead(result);
141      }
142      return result;
143    }
144
145    @Override
146    public void close() throws IOException {
147      in.close();
148    }
149
150    @Override
151    public synchronized void seek(long pos) throws IOException {
152      in.close();
153      LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
154      in = store.retrieve(key, pos);
155      this.pos = pos;
156    }
157    @Override
158    public synchronized long getPos() throws IOException {
159      return pos;
160    }
161    @Override
162    public boolean seekToNewSource(long targetPos) throws IOException {
163      return false;
164    }
165  }
166  
167  private class NativeS3FsOutputStream extends OutputStream {
168    
169    private Configuration conf;
170    private String key;
171    private File backupFile;
172    private OutputStream backupStream;
173    private MessageDigest digest;
174    private boolean closed;
175    
176    public NativeS3FsOutputStream(Configuration conf,
177        NativeFileSystemStore store, String key, Progressable progress,
178        int bufferSize) throws IOException {
179      this.conf = conf;
180      this.key = key;
181      this.backupFile = newBackupFile();
182      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
183      try {
184        this.digest = MessageDigest.getInstance("MD5");
185        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
186            new FileOutputStream(backupFile), this.digest));
187      } catch (NoSuchAlgorithmException e) {
188        LOG.warn("Cannot load MD5 digest algorithm," +
189            "skipping message integrity check.", e);
190        this.backupStream = new BufferedOutputStream(
191            new FileOutputStream(backupFile));
192      }
193    }
194
195    private File newBackupFile() throws IOException {
196      File dir = new File(conf.get("fs.s3.buffer.dir"));
197      if (!dir.mkdirs() && !dir.exists()) {
198        throw new IOException("Cannot create S3 buffer directory: " + dir);
199      }
200      File result = File.createTempFile("output-", ".tmp", dir);
201      result.deleteOnExit();
202      return result;
203    }
204    
205    @Override
206    public void flush() throws IOException {
207      backupStream.flush();
208    }
209    
210    @Override
211    public synchronized void close() throws IOException {
212      if (closed) {
213        return;
214      }
215
216      backupStream.close();
217      LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
218      
219      try {
220        byte[] md5Hash = digest == null ? null : digest.digest();
221        store.storeFile(key, backupFile, md5Hash);
222      } finally {
223        if (!backupFile.delete()) {
224          LOG.warn("Could not delete temporary s3n file: " + backupFile);
225        }
226        super.close();
227        closed = true;
228      } 
229      LOG.info("OutputStream for key '" + key + "' upload complete");
230    }
231
232    @Override
233    public void write(int b) throws IOException {
234      backupStream.write(b);
235    }
236
237    @Override
238    public void write(byte[] b, int off, int len) throws IOException {
239      backupStream.write(b, off, len);
240    }
241  }
242  
243  private URI uri;
244  private NativeFileSystemStore store;
245  private Path workingDir;
246  
247  public NativeS3FileSystem() {
248    // set store in initialize()
249  }
250  
251  public NativeS3FileSystem(NativeFileSystemStore store) {
252    this.store = store;
253  }
254  
255  @Override
256  public void initialize(URI uri, Configuration conf) throws IOException {
257    super.initialize(uri, conf);
258    if (store == null) {
259      store = createDefaultStore(conf);
260    }
261    store.initialize(uri, conf);
262    setConf(conf);
263    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
264    this.workingDir =
265      new Path("/user", System.getProperty("user.name")).makeQualified(this);
266  }
267  
268  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
269    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
270    
271    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
272        conf.getInt("fs.s3.maxRetries", 4),
273        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
274    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
275      new HashMap<Class<? extends Exception>, RetryPolicy>();
276    exceptionToPolicyMap.put(IOException.class, basePolicy);
277    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
278    
279    RetryPolicy methodPolicy = RetryPolicies.retryByException(
280        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
281    Map<String, RetryPolicy> methodNameToPolicyMap =
282      new HashMap<String, RetryPolicy>();
283    methodNameToPolicyMap.put("storeFile", methodPolicy);
284    methodNameToPolicyMap.put("rename", methodPolicy);
285    
286    return (NativeFileSystemStore)
287      RetryProxy.create(NativeFileSystemStore.class, store,
288          methodNameToPolicyMap);
289  }
290  
291  private static String pathToKey(Path path) {
292    if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
293      // allow uris without trailing slash after bucket to refer to root,
294      // like s3n://mybucket
295      return "";
296    }
297    if (!path.isAbsolute()) {
298      throw new IllegalArgumentException("Path must be absolute: " + path);
299    }
300    String ret = path.toUri().getPath().substring(1); // remove initial slash
301    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
302      ret = ret.substring(0, ret.length() -1);
303  }
304    return ret;
305  }
306  
307  private static Path keyToPath(String key) {
308    return new Path("/" + key);
309  }
310  
311  private Path makeAbsolute(Path path) {
312    if (path.isAbsolute()) {
313      return path;
314    }
315    return new Path(workingDir, path);
316  }
317
318  /** This optional operation is not yet supported. */
319  @Override
320  public FSDataOutputStream append(Path f, int bufferSize,
321      Progressable progress) throws IOException {
322    throw new IOException("Not supported");
323  }
324  
325  @Override
326  public FSDataOutputStream create(Path f, FsPermission permission,
327      boolean overwrite, int bufferSize, short replication, long blockSize,
328      Progressable progress) throws IOException {
329
330    if (exists(f) && !overwrite) {
331      throw new IOException("File already exists:"+f);
332    }
333    
334    if(LOG.isDebugEnabled()) {
335      LOG.debug("Creating new file '" + f + "' in S3");
336    }
337    Path absolutePath = makeAbsolute(f);
338    String key = pathToKey(absolutePath);
339    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
340        key, progress, bufferSize), statistics);
341  }
342  
343  @Override
344  public boolean delete(Path f, boolean recurse) throws IOException {
345    FileStatus status;
346    try {
347      status = getFileStatus(f);
348    } catch (FileNotFoundException e) {
349      if(LOG.isDebugEnabled()) {
350        LOG.debug("Delete called for '" + f +
351            "' but file does not exist, so returning false");
352      }
353      return false;
354    }
355    Path absolutePath = makeAbsolute(f);
356    String key = pathToKey(absolutePath);
357    if (status.isDirectory()) {
358      if (!recurse && listStatus(f).length > 0) {
359        throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
360      }
361
362      createParent(f);
363
364      if(LOG.isDebugEnabled()) {
365        LOG.debug("Deleting directory '" + f  + "'");
366      }
367      String priorLastKey = null;
368      do {
369        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
370        for (FileMetadata file : listing.getFiles()) {
371          store.delete(file.getKey());
372        }
373        priorLastKey = listing.getPriorLastKey();
374      } while (priorLastKey != null);
375
376      try {
377        store.delete(key + FOLDER_SUFFIX);
378      } catch (FileNotFoundException e) {
379        //this is fine, we don't require a marker
380      }
381    } else {
382      if(LOG.isDebugEnabled()) {
383        LOG.debug("Deleting file '" + f + "'");
384      }
385      createParent(f);
386      store.delete(key);
387    }
388    return true;
389  }
390
391  @Override
392  public FileStatus getFileStatus(Path f) throws IOException {
393    Path absolutePath = makeAbsolute(f);
394    String key = pathToKey(absolutePath);
395    
396    if (key.length() == 0) { // root always exists
397      return newDirectory(absolutePath);
398    }
399    
400    if(LOG.isDebugEnabled()) {
401      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
402    }
403    FileMetadata meta = store.retrieveMetadata(key);
404    if (meta != null) {
405      if(LOG.isDebugEnabled()) {
406        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
407      }
408      return newFile(meta, absolutePath);
409    }
410    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
411      if(LOG.isDebugEnabled()) {
412        LOG.debug("getFileStatus returning 'directory' for key '" + key +
413            "' as '" + key + FOLDER_SUFFIX + "' exists");
414      }
415      return newDirectory(absolutePath);
416    }
417    
418    if(LOG.isDebugEnabled()) {
419      LOG.debug("getFileStatus listing key '" + key + "'");
420    }
421    PartialListing listing = store.list(key, 1);
422    if (listing.getFiles().length > 0 ||
423        listing.getCommonPrefixes().length > 0) {
424      if(LOG.isDebugEnabled()) {
425        LOG.debug("getFileStatus returning 'directory' for key '" + key +
426            "' as it has contents");
427      }
428      return newDirectory(absolutePath);
429    }
430    
431    if(LOG.isDebugEnabled()) {
432      LOG.debug("getFileStatus could not find key '" + key + "'");
433    }
434    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
435  }
436
437  @Override
438  public URI getUri() {
439    return uri;
440  }
441
442  /**
443   * <p>
444   * If <code>f</code> is a file, this method will make a single call to S3.
445   * If <code>f</code> is a directory, this method will make a maximum of
446   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
447   * files and directories contained directly in <code>f</code>.
448   * </p>
449   */
450  @Override
451  public FileStatus[] listStatus(Path f) throws IOException {
452
453    Path absolutePath = makeAbsolute(f);
454    String key = pathToKey(absolutePath);
455    
456    if (key.length() > 0) {
457      FileMetadata meta = store.retrieveMetadata(key);
458      if (meta != null) {
459        return new FileStatus[] { newFile(meta, absolutePath) };
460      }
461    }
462    
463    URI pathUri = absolutePath.toUri();
464    Set<FileStatus> status = new TreeSet<FileStatus>();
465    String priorLastKey = null;
466    do {
467      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
468      for (FileMetadata fileMetadata : listing.getFiles()) {
469        Path subpath = keyToPath(fileMetadata.getKey());
470        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
471
472        if (fileMetadata.getKey().equals(key + "/")) {
473          // this is just the directory we have been asked to list
474        }
475        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
476          status.add(newDirectory(new Path(
477              absolutePath,
478              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
479        }
480        else {
481          status.add(newFile(fileMetadata, subpath));
482        }
483      }
484      for (String commonPrefix : listing.getCommonPrefixes()) {
485        Path subpath = keyToPath(commonPrefix);
486        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
487        status.add(newDirectory(new Path(absolutePath, relativePath)));
488      }
489      priorLastKey = listing.getPriorLastKey();
490    } while (priorLastKey != null);
491    
492    if (status.isEmpty() &&
493        key.length() > 0 &&
494        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
495      throw new FileNotFoundException("File " + f + " does not exist.");
496    }
497    
498    return status.toArray(new FileStatus[status.size()]);
499  }
500  
501  private FileStatus newFile(FileMetadata meta, Path path) {
502    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
503        meta.getLastModified(), path.makeQualified(this));
504  }
505  
506  private FileStatus newDirectory(Path path) {
507    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
508  }
509
510  @Override
511  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
512    Path absolutePath = makeAbsolute(f);
513    List<Path> paths = new ArrayList<Path>();
514    do {
515      paths.add(0, absolutePath);
516      absolutePath = absolutePath.getParent();
517    } while (absolutePath != null);
518    
519    boolean result = true;
520    for (Path path : paths) {
521      result &= mkdir(path);
522    }
523    return result;
524  }
525  
526  private boolean mkdir(Path f) throws IOException {
527    try {
528      FileStatus fileStatus = getFileStatus(f);
529      if (fileStatus.isFile()) {
530        throw new IOException(String.format(
531            "Can't make directory for path '%s' since it is a file.", f));
532
533      }
534    } catch (FileNotFoundException e) {
535      if(LOG.isDebugEnabled()) {
536        LOG.debug("Making dir '" + f + "' in S3");
537      }
538      String key = pathToKey(f) + FOLDER_SUFFIX;
539      store.storeEmptyFile(key);    
540    }
541    return true;
542  }
543
544  @Override
545  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
546    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
547    if (fs.isDirectory()) {
548      throw new IOException("'" + f + "' is a directory");
549    }
550    LOG.info("Opening '" + f + "' for reading");
551    Path absolutePath = makeAbsolute(f);
552    String key = pathToKey(absolutePath);
553    return new FSDataInputStream(new BufferedFSInputStream(
554        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
555  }
556  
557  // rename() and delete() use this method to ensure that the parent directory
558  // of the source does not vanish.
559  private void createParent(Path path) throws IOException {
560    Path parent = path.getParent();
561    if (parent != null) {
562      String key = pathToKey(makeAbsolute(parent));
563      if (key.length() > 0) {
564          store.storeEmptyFile(key + FOLDER_SUFFIX);
565      }
566    }
567  }
568  
569    
570  @Override
571  public boolean rename(Path src, Path dst) throws IOException {
572
573    String srcKey = pathToKey(makeAbsolute(src));
574
575    if (srcKey.length() == 0) {
576      // Cannot rename root of file system
577      return false;
578    }
579
580    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
581
582    // Figure out the final destination
583    String dstKey;
584    try {
585      boolean dstIsFile = getFileStatus(dst).isFile();
586      if (dstIsFile) {
587        if(LOG.isDebugEnabled()) {
588          LOG.debug(debugPreamble +
589              "returning false as dst is an already existing file");
590        }
591        return false;
592      } else {
593        if(LOG.isDebugEnabled()) {
594          LOG.debug(debugPreamble + "using dst as output directory");
595        }
596        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
597      }
598    } catch (FileNotFoundException e) {
599      if(LOG.isDebugEnabled()) {
600        LOG.debug(debugPreamble + "using dst as output destination");
601      }
602      dstKey = pathToKey(makeAbsolute(dst));
603      try {
604        if (getFileStatus(dst.getParent()).isFile()) {
605          if(LOG.isDebugEnabled()) {
606            LOG.debug(debugPreamble +
607                "returning false as dst parent exists and is a file");
608          }
609          return false;
610        }
611      } catch (FileNotFoundException ex) {
612        if(LOG.isDebugEnabled()) {
613          LOG.debug(debugPreamble +
614              "returning false as dst parent does not exist");
615        }
616        return false;
617      }
618    }
619
620    boolean srcIsFile;
621    try {
622      srcIsFile = getFileStatus(src).isFile();
623    } catch (FileNotFoundException e) {
624      if(LOG.isDebugEnabled()) {
625        LOG.debug(debugPreamble + "returning false as src does not exist");
626      }
627      return false;
628    }
629    if (srcIsFile) {
630      if(LOG.isDebugEnabled()) {
631        LOG.debug(debugPreamble +
632            "src is file, so doing copy then delete in S3");
633      }
634      store.copy(srcKey, dstKey);
635      store.delete(srcKey);
636    } else {
637      if(LOG.isDebugEnabled()) {
638        LOG.debug(debugPreamble + "src is directory, so copying contents");
639      }
640      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
641
642      List<String> keysToDelete = new ArrayList<String>();
643      String priorLastKey = null;
644      do {
645        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
646        for (FileMetadata file : listing.getFiles()) {
647          keysToDelete.add(file.getKey());
648          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
649        }
650        priorLastKey = listing.getPriorLastKey();
651      } while (priorLastKey != null);
652
653      if(LOG.isDebugEnabled()) {
654        LOG.debug(debugPreamble +
655            "all files in src copied, now removing src files");
656      }
657      for (String key: keysToDelete) {
658        store.delete(key);
659      }
660
661      try {
662        store.delete(srcKey + FOLDER_SUFFIX);
663      } catch (FileNotFoundException e) {
664        //this is fine, we don't require a marker
665      }
666      if(LOG.isDebugEnabled()) {
667        LOG.debug(debugPreamble + "done");
668      }
669    }
670
671    return true;
672  }
673  
674  @Override
675  public long getDefaultBlockSize() {
676    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
677  }
678
679  /**
680   * Set the working directory to the given directory.
681   */
682  @Override
683  public void setWorkingDirectory(Path newDir) {
684    workingDir = newDir;
685  }
686  
687  @Override
688  public Path getWorkingDirectory() {
689    return workingDir;
690  }
691}