001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.EOFException;
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.URI;
030import java.security.DigestOutputStream;
031import java.security.MessageDigest;
032import java.security.NoSuchAlgorithmException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.TimeUnit;
040
041import com.google.common.base.Preconditions;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSExceptionMessages;
049import org.apache.hadoop.fs.FSInputStream;
050import org.apache.hadoop.fs.FileAlreadyExistsException;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocalDirAllocator;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.fs.s3.S3Exception;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.io.retry.RetryPolicies;
059import org.apache.hadoop.io.retry.RetryPolicy;
060import org.apache.hadoop.io.retry.RetryProxy;
061import org.apache.hadoop.util.Progressable;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A {@link FileSystem} for reading and writing files stored on
067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
069 * stores files on S3 in their
070 * native form so they can be read by other S3 tools.
071 * <p>
072 * A note about directories. S3 of course has no "native" support for them.
073 * The idiom we choose then is: for any directory created by this class,
074 * we use an empty object "#{dirpath}_$folder$" as a marker.
075 * Further, to interoperate with other S3 tools, we also accept the following:
076 * <ul>
077 *   <li>an object "#{dirpath}/' denoting a directory marker</li>
078 *   <li>
079 *     if there exists any objects with the prefix "#{dirpath}/", then the
080 *     directory is said to exist
081 *   </li>
082 *   <li>
083 *     if both a file with the name of a directory and a marker for that
084 *     directory exists, then the *file masks the directory*, and the directory
085 *     is never returned.
086 *   </li>
087 * </ul>
088 *
089 * @see org.apache.hadoop.fs.s3.S3FileSystem
090 */
091@InterfaceAudience.Public
092@InterfaceStability.Stable
093public class NativeS3FileSystem extends FileSystem {
094  
095  public static final Logger LOG =
096      LoggerFactory.getLogger(NativeS3FileSystem.class);
097  
098  private static final String FOLDER_SUFFIX = "_$folder$";
099  static final String PATH_DELIMITER = Path.SEPARATOR;
100  private static final int S3_MAX_LISTING_LENGTH = 1000;
101  
102  static class NativeS3FsInputStream extends FSInputStream {
103    
104    private NativeFileSystemStore store;
105    private Statistics statistics;
106    private InputStream in;
107    private final String key;
108    private long pos = 0;
109    
110    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
111      Preconditions.checkNotNull(in, "Null input stream");
112      this.store = store;
113      this.statistics = statistics;
114      this.in = in;
115      this.key = key;
116    }
117    
118    @Override
119    public synchronized int read() throws IOException {
120      int result;
121      try {
122        result = in.read();
123      } catch (IOException e) {
124        LOG.info("Received IOException while reading '{}', attempting to reopen",
125            key);
126        LOG.debug("{}", e, e);
127        try {
128          reopen(pos);
129          result = in.read();
130        } catch (EOFException eof) {
131          LOG.debug("EOF on input stream read: {}", eof, eof);
132          result = -1;
133        }
134      } 
135      if (result != -1) {
136        pos++;
137      }
138      if (statistics != null && result != -1) {
139        statistics.incrementBytesRead(1);
140      }
141      return result;
142    }
143    @Override
144    public synchronized int read(byte[] b, int off, int len)
145      throws IOException {
146      if (in == null) {
147        throw new EOFException("Cannot read closed stream");
148      }
149      int result = -1;
150      try {
151        result = in.read(b, off, len);
152      } catch (EOFException eof) {
153        throw eof;
154      } catch (IOException e) {
155        LOG.info( "Received IOException while reading '{}'," +
156                  " attempting to reopen.", key);
157        reopen(pos);
158        result = in.read(b, off, len);
159      }
160      if (result > 0) {
161        pos += result;
162      }
163      if (statistics != null && result > 0) {
164        statistics.incrementBytesRead(result);
165      }
166      return result;
167    }
168
169    @Override
170    public synchronized void close() throws IOException {
171      closeInnerStream();
172    }
173
174    /**
175     * Close the inner stream if not null. Even if an exception
176     * is raised during the close, the field is set to null
177     */
178    private void closeInnerStream() {
179      IOUtils.closeStream(in);
180      in = null;
181    }
182
183    /**
184     * Reopen a new input stream with the specified position
185     * @param pos the position to reopen a new stream
186     * @throws IOException
187     */
188    private synchronized void reopen(long pos) throws IOException {
189        LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
190        InputStream newStream = store.retrieve(key, pos);
191        updateInnerStream(newStream, pos);
192    }
193
194    /**
195     * Update inner stream with a new stream and position
196     * @param newStream new stream -must not be null
197     * @param newpos new position
198     * @throws IOException IO exception on a failure to close the existing
199     * stream.
200     */
201    private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
202      Preconditions.checkNotNull(newStream, "Null newstream argument");
203      closeInnerStream();
204      in = newStream;
205      this.pos = newpos;
206    }
207
208    @Override
209    public synchronized void seek(long newpos) throws IOException {
210      if (newpos < 0) {
211        throw new EOFException(
212            FSExceptionMessages.NEGATIVE_SEEK);
213      }
214      if (pos != newpos) {
215        // the seek is attempting to move the current position
216        reopen(newpos);
217      }
218    }
219
220    @Override
221    public synchronized long getPos() throws IOException {
222      return pos;
223    }
224    @Override
225    public boolean seekToNewSource(long targetPos) throws IOException {
226      return false;
227    }
228  }
229  
230  private class NativeS3FsOutputStream extends OutputStream {
231    
232    private Configuration conf;
233    private String key;
234    private File backupFile;
235    private OutputStream backupStream;
236    private MessageDigest digest;
237    private boolean closed;
238    private LocalDirAllocator lDirAlloc;
239    
240    public NativeS3FsOutputStream(Configuration conf,
241        NativeFileSystemStore store, String key, Progressable progress,
242        int bufferSize) throws IOException {
243      this.conf = conf;
244      this.key = key;
245      this.backupFile = newBackupFile();
246      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
247      try {
248        this.digest = MessageDigest.getInstance("MD5");
249        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
250            new FileOutputStream(backupFile), this.digest));
251      } catch (NoSuchAlgorithmException e) {
252        LOG.warn("Cannot load MD5 digest algorithm," +
253            "skipping message integrity check.", e);
254        this.backupStream = new BufferedOutputStream(
255            new FileOutputStream(backupFile));
256      }
257    }
258
259    private File newBackupFile() throws IOException {
260      if (lDirAlloc == null) {
261        lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
262      }
263      File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
264      result.deleteOnExit();
265      return result;
266    }
267    
268    @Override
269    public void flush() throws IOException {
270      backupStream.flush();
271    }
272    
273    @Override
274    public synchronized void close() throws IOException {
275      if (closed) {
276        return;
277      }
278
279      backupStream.close();
280      LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
281      
282      try {
283        byte[] md5Hash = digest == null ? null : digest.digest();
284        store.storeFile(key, backupFile, md5Hash);
285      } finally {
286        if (!backupFile.delete()) {
287          LOG.warn("Could not delete temporary s3n file: " + backupFile);
288        }
289        super.close();
290        closed = true;
291      } 
292      LOG.info("OutputStream for key '{}' upload complete", key);
293    }
294
295    @Override
296    public void write(int b) throws IOException {
297      backupStream.write(b);
298    }
299
300    @Override
301    public void write(byte[] b, int off, int len) throws IOException {
302      backupStream.write(b, off, len);
303    }
304  }
305  
306  private URI uri;
307  private NativeFileSystemStore store;
308  private Path workingDir;
309  
310  public NativeS3FileSystem() {
311    // set store in initialize()
312  }
313  
314  public NativeS3FileSystem(NativeFileSystemStore store) {
315    this.store = store;
316  }
317
318  /**
319   * Return the protocol scheme for the FileSystem.
320   *
321   * @return <code>s3n</code>
322   */
323  @Override
324  public String getScheme() {
325    return "s3n";
326  }
327
328  @Override
329  public void initialize(URI uri, Configuration conf) throws IOException {
330    super.initialize(uri, conf);
331    if (store == null) {
332      store = createDefaultStore(conf);
333    }
334    store.initialize(uri, conf);
335    setConf(conf);
336    this.uri = S3xLoginHelper.buildFSURI(uri);
337    this.workingDir =
338      new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
339  }
340  
341  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
342    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
343    
344    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
345        conf.getInt("fs.s3.maxRetries", 4),
346        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
347    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
348      new HashMap<Class<? extends Exception>, RetryPolicy>();
349    exceptionToPolicyMap.put(IOException.class, basePolicy);
350    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
351    
352    RetryPolicy methodPolicy = RetryPolicies.retryByException(
353        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
354    Map<String, RetryPolicy> methodNameToPolicyMap =
355      new HashMap<String, RetryPolicy>();
356    methodNameToPolicyMap.put("storeFile", methodPolicy);
357    methodNameToPolicyMap.put("rename", methodPolicy);
358    
359    return (NativeFileSystemStore)
360      RetryProxy.create(NativeFileSystemStore.class, store,
361          methodNameToPolicyMap);
362  }
363  
364  private static String pathToKey(Path path) {
365    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
366      // allow uris without trailing slash after bucket to refer to root,
367      // like s3n://mybucket
368      return "";
369    }
370    if (!path.isAbsolute()) {
371      throw new IllegalArgumentException("Path must be absolute: " + path);
372    }
373    String ret = path.toUri().getPath().substring(1); // remove initial slash
374    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
375      ret = ret.substring(0, ret.length() -1);
376  }
377    return ret;
378  }
379  
380  private static Path keyToPath(String key) {
381    return new Path("/" + key);
382  }
383  
384  private Path makeAbsolute(Path path) {
385    if (path.isAbsolute()) {
386      return path;
387    }
388    return new Path(workingDir, path);
389  }
390
391  /**
392   * Check that a Path belongs to this FileSystem.
393   * Unlike the superclass, this version does not look at authority,
394   * only hostnames.
395   * @param path to check
396   * @throws IllegalArgumentException if there is an FS mismatch
397   */
398  @Override
399  protected void checkPath(Path path) {
400    S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
401  }
402
403  @Override
404  protected URI canonicalizeUri(URI rawUri) {
405    return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
406  }
407
408  /** This optional operation is not yet supported. */
409  @Override
410  public FSDataOutputStream append(Path f, int bufferSize,
411      Progressable progress) throws IOException {
412    throw new IOException("Not supported");
413  }
414  
415  @Override
416  public FSDataOutputStream create(Path f, FsPermission permission,
417      boolean overwrite, int bufferSize, short replication, long blockSize,
418      Progressable progress) throws IOException {
419
420    if (exists(f) && !overwrite) {
421      throw new FileAlreadyExistsException("File already exists: " + f);
422    }
423    
424    if(LOG.isDebugEnabled()) {
425      LOG.debug("Creating new file '" + f + "' in S3");
426    }
427    Path absolutePath = makeAbsolute(f);
428    String key = pathToKey(absolutePath);
429    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
430        key, progress, bufferSize), statistics);
431  }
432  
433  @Override
434  public boolean delete(Path f, boolean recurse) throws IOException {
435    FileStatus status;
436    try {
437      status = getFileStatus(f);
438    } catch (FileNotFoundException e) {
439      if(LOG.isDebugEnabled()) {
440        LOG.debug("Delete called for '" + f +
441            "' but file does not exist, so returning false");
442      }
443      return false;
444    }
445    Path absolutePath = makeAbsolute(f);
446    String key = pathToKey(absolutePath);
447    if (status.isDirectory()) {
448      if (!recurse && listStatus(f).length > 0) {
449        throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
450      }
451
452      createParent(f);
453
454      if(LOG.isDebugEnabled()) {
455        LOG.debug("Deleting directory '" + f  + "'");
456      }
457      String priorLastKey = null;
458      do {
459        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
460        for (FileMetadata file : listing.getFiles()) {
461          store.delete(file.getKey());
462        }
463        priorLastKey = listing.getPriorLastKey();
464      } while (priorLastKey != null);
465
466      try {
467        store.delete(key + FOLDER_SUFFIX);
468      } catch (FileNotFoundException e) {
469        //this is fine, we don't require a marker
470      }
471    } else {
472      if(LOG.isDebugEnabled()) {
473        LOG.debug("Deleting file '" + f + "'");
474      }
475      createParent(f);
476      store.delete(key);
477    }
478    return true;
479  }
480
481  @Override
482  public FileStatus getFileStatus(Path f) throws IOException {
483    Path absolutePath = makeAbsolute(f);
484    String key = pathToKey(absolutePath);
485    
486    if (key.length() == 0) { // root always exists
487      return newDirectory(absolutePath);
488    }
489    
490    if(LOG.isDebugEnabled()) {
491      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
492    }
493    FileMetadata meta = store.retrieveMetadata(key);
494    if (meta != null) {
495      if(LOG.isDebugEnabled()) {
496        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
497      }
498      return newFile(meta, absolutePath);
499    }
500    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
501      if(LOG.isDebugEnabled()) {
502        LOG.debug("getFileStatus returning 'directory' for key '" + key +
503            "' as '" + key + FOLDER_SUFFIX + "' exists");
504      }
505      return newDirectory(absolutePath);
506    }
507    
508    if(LOG.isDebugEnabled()) {
509      LOG.debug("getFileStatus listing key '" + key + "'");
510    }
511    PartialListing listing = store.list(key, 1);
512    if (listing.getFiles().length > 0 ||
513        listing.getCommonPrefixes().length > 0) {
514      if(LOG.isDebugEnabled()) {
515        LOG.debug("getFileStatus returning 'directory' for key '" + key +
516            "' as it has contents");
517      }
518      return newDirectory(absolutePath);
519    }
520    
521    if(LOG.isDebugEnabled()) {
522      LOG.debug("getFileStatus could not find key '" + key + "'");
523    }
524    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
525  }
526
527  @Override
528  public URI getUri() {
529    return uri;
530  }
531
532  /**
533   * <p>
534   * If <code>f</code> is a file, this method will make a single call to S3.
535   * If <code>f</code> is a directory, this method will make a maximum of
536   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
537   * files and directories contained directly in <code>f</code>.
538   * </p>
539   */
540  @Override
541  public FileStatus[] listStatus(Path f) throws IOException {
542
543    Path absolutePath = makeAbsolute(f);
544    String key = pathToKey(absolutePath);
545    
546    if (key.length() > 0) {
547      FileMetadata meta = store.retrieveMetadata(key);
548      if (meta != null) {
549        return new FileStatus[] { newFile(meta, absolutePath) };
550      }
551    }
552    
553    URI pathUri = absolutePath.toUri();
554    Set<FileStatus> status = new TreeSet<FileStatus>();
555    String priorLastKey = null;
556    do {
557      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
558      for (FileMetadata fileMetadata : listing.getFiles()) {
559        Path subpath = keyToPath(fileMetadata.getKey());
560        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
561
562        if (fileMetadata.getKey().equals(key + "/")) {
563          // this is just the directory we have been asked to list
564        }
565        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
566          status.add(newDirectory(new Path(
567              absolutePath,
568              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
569        }
570        else {
571          status.add(newFile(fileMetadata, subpath));
572        }
573      }
574      for (String commonPrefix : listing.getCommonPrefixes()) {
575        Path subpath = keyToPath(commonPrefix);
576        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
577        // sometimes the common prefix includes the base dir (HADOOP-13830).
578        // avoid that problem by detecting it and keeping it out
579        // of the list
580        if (!relativePath.isEmpty()) {
581          status.add(newDirectory(new Path(absolutePath, relativePath)));
582        }
583      }
584      priorLastKey = listing.getPriorLastKey();
585    } while (priorLastKey != null);
586    
587    if (status.isEmpty() &&
588        key.length() > 0 &&
589        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
590      throw new FileNotFoundException("File " + f + " does not exist.");
591    }
592    
593    return status.toArray(new FileStatus[status.size()]);
594  }
595  
596  private FileStatus newFile(FileMetadata meta, Path path) {
597    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
598        meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
599  }
600  
601  private FileStatus newDirectory(Path path) {
602    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
603  }
604
605  @Override
606  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
607    Path absolutePath = makeAbsolute(f);
608    List<Path> paths = new ArrayList<Path>();
609    do {
610      paths.add(0, absolutePath);
611      absolutePath = absolutePath.getParent();
612    } while (absolutePath != null);
613    
614    boolean result = true;
615    for (Path path : paths) {
616      result &= mkdir(path);
617    }
618    return result;
619  }
620  
621  private boolean mkdir(Path f) throws IOException {
622    try {
623      FileStatus fileStatus = getFileStatus(f);
624      if (fileStatus.isFile()) {
625        throw new FileAlreadyExistsException(String.format(
626            "Can't make directory for path '%s' since it is a file.", f));
627
628      }
629    } catch (FileNotFoundException e) {
630      if(LOG.isDebugEnabled()) {
631        LOG.debug("Making dir '" + f + "' in S3");
632      }
633      String key = pathToKey(f) + FOLDER_SUFFIX;
634      store.storeEmptyFile(key);    
635    }
636    return true;
637  }
638
639  @Override
640  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
641    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
642    if (fs.isDirectory()) {
643      throw new FileNotFoundException("'" + f + "' is a directory");
644    }
645    LOG.info("Opening '" + f + "' for reading");
646    Path absolutePath = makeAbsolute(f);
647    String key = pathToKey(absolutePath);
648    return new FSDataInputStream(new BufferedFSInputStream(
649        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
650  }
651  
652  // rename() and delete() use this method to ensure that the parent directory
653  // of the source does not vanish.
654  private void createParent(Path path) throws IOException {
655    Path parent = path.getParent();
656    if (parent != null) {
657      String key = pathToKey(makeAbsolute(parent));
658      if (key.length() > 0) {
659          store.storeEmptyFile(key + FOLDER_SUFFIX);
660      }
661    }
662  }
663  
664    
665  @Override
666  public boolean rename(Path src, Path dst) throws IOException {
667
668    String srcKey = pathToKey(makeAbsolute(src));
669
670    if (srcKey.length() == 0) {
671      // Cannot rename root of file system
672      return false;
673    }
674
675    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
676
677    // Figure out the final destination
678    String dstKey;
679    try {
680      boolean dstIsFile = getFileStatus(dst).isFile();
681      if (dstIsFile) {
682        if(LOG.isDebugEnabled()) {
683          LOG.debug(debugPreamble +
684              "returning false as dst is an already existing file");
685        }
686        return false;
687      } else {
688        if(LOG.isDebugEnabled()) {
689          LOG.debug(debugPreamble + "using dst as output directory");
690        }
691        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
692      }
693    } catch (FileNotFoundException e) {
694      if(LOG.isDebugEnabled()) {
695        LOG.debug(debugPreamble + "using dst as output destination");
696      }
697      dstKey = pathToKey(makeAbsolute(dst));
698      try {
699        if (getFileStatus(dst.getParent()).isFile()) {
700          if(LOG.isDebugEnabled()) {
701            LOG.debug(debugPreamble +
702                "returning false as dst parent exists and is a file");
703          }
704          return false;
705        }
706      } catch (FileNotFoundException ex) {
707        if(LOG.isDebugEnabled()) {
708          LOG.debug(debugPreamble +
709              "returning false as dst parent does not exist");
710        }
711        return false;
712      }
713    }
714
715    boolean srcIsFile;
716    try {
717      srcIsFile = getFileStatus(src).isFile();
718    } catch (FileNotFoundException e) {
719      if(LOG.isDebugEnabled()) {
720        LOG.debug(debugPreamble + "returning false as src does not exist");
721      }
722      return false;
723    }
724    if (srcIsFile) {
725      if(LOG.isDebugEnabled()) {
726        LOG.debug(debugPreamble +
727            "src is file, so doing copy then delete in S3");
728      }
729      store.copy(srcKey, dstKey);
730      store.delete(srcKey);
731    } else {
732      if(LOG.isDebugEnabled()) {
733        LOG.debug(debugPreamble + "src is directory, so copying contents");
734      }
735      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
736
737      List<String> keysToDelete = new ArrayList<String>();
738      String priorLastKey = null;
739      do {
740        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
741        for (FileMetadata file : listing.getFiles()) {
742          keysToDelete.add(file.getKey());
743          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
744        }
745        priorLastKey = listing.getPriorLastKey();
746      } while (priorLastKey != null);
747
748      if(LOG.isDebugEnabled()) {
749        LOG.debug(debugPreamble +
750            "all files in src copied, now removing src files");
751      }
752      for (String key: keysToDelete) {
753        store.delete(key);
754      }
755
756      try {
757        store.delete(srcKey + FOLDER_SUFFIX);
758      } catch (FileNotFoundException e) {
759        //this is fine, we don't require a marker
760      }
761      if(LOG.isDebugEnabled()) {
762        LOG.debug(debugPreamble + "done");
763      }
764    }
765
766    return true;
767  }
768  
769  @Override
770  public long getDefaultBlockSize() {
771    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
772  }
773
774  /**
775   * Set the working directory to the given directory.
776   */
777  @Override
778  public void setWorkingDirectory(Path newDir) {
779    workingDir = newDir;
780  }
781  
782  @Override
783  public Path getWorkingDirectory() {
784    return workingDir;
785  }
786
787  @Override
788  public String getCanonicalServiceName() {
789    // Does not support Token
790    return null;
791  }
792}