001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.EOFException;
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.URI;
030import java.security.DigestOutputStream;
031import java.security.MessageDigest;
032import java.security.NoSuchAlgorithmException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.TimeUnit;
040
041import com.google.common.base.Preconditions;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSExceptionMessages;
049import org.apache.hadoop.fs.FSInputStream;
050import org.apache.hadoop.fs.FileAlreadyExistsException;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocalDirAllocator;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.fs.s3.S3Exception;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.io.retry.RetryPolicies;
059import org.apache.hadoop.io.retry.RetryPolicy;
060import org.apache.hadoop.io.retry.RetryProxy;
061import org.apache.hadoop.util.Progressable;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A {@link FileSystem} for reading and writing files stored on
067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
069 * stores files on S3 in their
070 * native form so they can be read by other S3 tools.
071 * <p>
072 * A note about directories. S3 of course has no "native" support for them.
073 * The idiom we choose then is: for any directory created by this class,
074 * we use an empty object "#{dirpath}_$folder$" as a marker.
075 * Further, to interoperate with other S3 tools, we also accept the following:
076 * <ul>
077 *   <li>an object "#{dirpath}/' denoting a directory marker</li>
078 *   <li>
079 *     if there exists any objects with the prefix "#{dirpath}/", then the
080 *     directory is said to exist
081 *   </li>
082 *   <li>
083 *     if both a file with the name of a directory and a marker for that
084 *     directory exists, then the *file masks the directory*, and the directory
085 *     is never returned.
086 *   </li>
087 * </ul>
088 *
089 * @see org.apache.hadoop.fs.s3.S3FileSystem
090 */
091@InterfaceAudience.Public
092@InterfaceStability.Stable
093public class NativeS3FileSystem extends FileSystem {
094  
095  public static final Logger LOG =
096      LoggerFactory.getLogger(NativeS3FileSystem.class);
097  
098  private static final String FOLDER_SUFFIX = "_$folder$";
099  static final String PATH_DELIMITER = Path.SEPARATOR;
100  private static final int S3_MAX_LISTING_LENGTH = 1000;
101  
102  static class NativeS3FsInputStream extends FSInputStream {
103    
104    private NativeFileSystemStore store;
105    private Statistics statistics;
106    private InputStream in;
107    private final String key;
108    private long pos = 0;
109    
110    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
111      Preconditions.checkNotNull(in, "Null input stream");
112      this.store = store;
113      this.statistics = statistics;
114      this.in = in;
115      this.key = key;
116    }
117    
118    @Override
119    public synchronized int read() throws IOException {
120      int result;
121      try {
122        result = in.read();
123      } catch (IOException e) {
124        LOG.info("Received IOException while reading '{}', attempting to reopen",
125            key);
126        LOG.debug("{}", e, e);
127        try {
128          reopen(pos);
129          result = in.read();
130        } catch (EOFException eof) {
131          LOG.debug("EOF on input stream read: {}", eof, eof);
132          result = -1;
133        }
134      } 
135      if (result != -1) {
136        pos++;
137      }
138      if (statistics != null && result != -1) {
139        statistics.incrementBytesRead(1);
140      }
141      return result;
142    }
143    @Override
144    public synchronized int read(byte[] b, int off, int len)
145      throws IOException {
146      if (in == null) {
147        throw new EOFException("Cannot read closed stream");
148      }
149      int result = -1;
150      try {
151        result = in.read(b, off, len);
152      } catch (EOFException eof) {
153        throw eof;
154      } catch (IOException e) {
155        LOG.info( "Received IOException while reading '{}'," +
156                  " attempting to reopen.", key);
157        reopen(pos);
158        result = in.read(b, off, len);
159      }
160      if (result > 0) {
161        pos += result;
162      }
163      if (statistics != null && result > 0) {
164        statistics.incrementBytesRead(result);
165      }
166      return result;
167    }
168
169    @Override
170    public synchronized void close() throws IOException {
171      closeInnerStream();
172    }
173
174    /**
175     * Close the inner stream if not null. Even if an exception
176     * is raised during the close, the field is set to null
177     */
178    private void closeInnerStream() {
179      IOUtils.closeStream(in);
180      in = null;
181    }
182
183    /**
184     * Reopen a new input stream with the specified position
185     * @param pos the position to reopen a new stream
186     * @throws IOException
187     */
188    private synchronized void reopen(long pos) throws IOException {
189        LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
190        InputStream newStream = store.retrieve(key, pos);
191        updateInnerStream(newStream, pos);
192    }
193
194    /**
195     * Update inner stream with a new stream and position
196     * @param newStream new stream -must not be null
197     * @param newpos new position
198     * @throws IOException IO exception on a failure to close the existing
199     * stream.
200     */
201    private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
202      Preconditions.checkNotNull(newStream, "Null newstream argument");
203      closeInnerStream();
204      in = newStream;
205      this.pos = newpos;
206    }
207
208    @Override
209    public synchronized void seek(long newpos) throws IOException {
210      if (newpos < 0) {
211        throw new EOFException(
212            FSExceptionMessages.NEGATIVE_SEEK);
213      }
214      if (pos != newpos) {
215        // the seek is attempting to move the current position
216        reopen(newpos);
217      }
218    }
219
220    @Override
221    public synchronized long getPos() throws IOException {
222      return pos;
223    }
224    @Override
225    public boolean seekToNewSource(long targetPos) throws IOException {
226      return false;
227    }
228  }
229  
230  private class NativeS3FsOutputStream extends OutputStream {
231    
232    private Configuration conf;
233    private String key;
234    private File backupFile;
235    private OutputStream backupStream;
236    private MessageDigest digest;
237    private boolean closed;
238    private LocalDirAllocator lDirAlloc;
239    
240    public NativeS3FsOutputStream(Configuration conf,
241        NativeFileSystemStore store, String key, Progressable progress,
242        int bufferSize) throws IOException {
243      this.conf = conf;
244      this.key = key;
245      this.backupFile = newBackupFile();
246      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
247      try {
248        this.digest = MessageDigest.getInstance("MD5");
249        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
250            new FileOutputStream(backupFile), this.digest));
251      } catch (NoSuchAlgorithmException e) {
252        LOG.warn("Cannot load MD5 digest algorithm," +
253            "skipping message integrity check.", e);
254        this.backupStream = new BufferedOutputStream(
255            new FileOutputStream(backupFile));
256      }
257    }
258
259    private File newBackupFile() throws IOException {
260      if (lDirAlloc == null) {
261        lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
262      }
263      File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
264      result.deleteOnExit();
265      return result;
266    }
267    
268    @Override
269    public void flush() throws IOException {
270      backupStream.flush();
271    }
272    
273    @Override
274    public synchronized void close() throws IOException {
275      if (closed) {
276        return;
277      }
278
279      backupStream.close();
280      LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
281      
282      try {
283        byte[] md5Hash = digest == null ? null : digest.digest();
284        store.storeFile(key, backupFile, md5Hash);
285      } finally {
286        if (!backupFile.delete()) {
287          LOG.warn("Could not delete temporary s3n file: " + backupFile);
288        }
289        super.close();
290        closed = true;
291      } 
292      LOG.info("OutputStream for key '{}' upload complete", key);
293    }
294
295    @Override
296    public void write(int b) throws IOException {
297      backupStream.write(b);
298    }
299
300    @Override
301    public void write(byte[] b, int off, int len) throws IOException {
302      backupStream.write(b, off, len);
303    }
304  }
305  
306  private URI uri;
307  private NativeFileSystemStore store;
308  private Path workingDir;
309  
310  public NativeS3FileSystem() {
311    // set store in initialize()
312  }
313  
314  public NativeS3FileSystem(NativeFileSystemStore store) {
315    this.store = store;
316  }
317
318  /**
319   * Return the protocol scheme for the FileSystem.
320   *
321   * @return <code>s3n</code>
322   */
323  @Override
324  public String getScheme() {
325    return "s3n";
326  }
327
328  @Override
329  public void initialize(URI uri, Configuration conf) throws IOException {
330    super.initialize(uri, conf);
331    if (store == null) {
332      store = createDefaultStore(conf);
333    }
334    store.initialize(uri, conf);
335    setConf(conf);
336    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
337    this.workingDir =
338      new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
339  }
340  
341  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
342    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
343    
344    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
345        conf.getInt("fs.s3.maxRetries", 4),
346        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
347    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
348      new HashMap<Class<? extends Exception>, RetryPolicy>();
349    exceptionToPolicyMap.put(IOException.class, basePolicy);
350    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
351    
352    RetryPolicy methodPolicy = RetryPolicies.retryByException(
353        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
354    Map<String, RetryPolicy> methodNameToPolicyMap =
355      new HashMap<String, RetryPolicy>();
356    methodNameToPolicyMap.put("storeFile", methodPolicy);
357    methodNameToPolicyMap.put("rename", methodPolicy);
358    
359    return (NativeFileSystemStore)
360      RetryProxy.create(NativeFileSystemStore.class, store,
361          methodNameToPolicyMap);
362  }
363  
364  private static String pathToKey(Path path) {
365    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
366      // allow uris without trailing slash after bucket to refer to root,
367      // like s3n://mybucket
368      return "";
369    }
370    if (!path.isAbsolute()) {
371      throw new IllegalArgumentException("Path must be absolute: " + path);
372    }
373    String ret = path.toUri().getPath().substring(1); // remove initial slash
374    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
375      ret = ret.substring(0, ret.length() -1);
376  }
377    return ret;
378  }
379  
380  private static Path keyToPath(String key) {
381    return new Path("/" + key);
382  }
383  
384  private Path makeAbsolute(Path path) {
385    if (path.isAbsolute()) {
386      return path;
387    }
388    return new Path(workingDir, path);
389  }
390
391  /** This optional operation is not yet supported. */
392  @Override
393  public FSDataOutputStream append(Path f, int bufferSize,
394      Progressable progress) throws IOException {
395    throw new IOException("Not supported");
396  }
397  
398  @Override
399  public FSDataOutputStream create(Path f, FsPermission permission,
400      boolean overwrite, int bufferSize, short replication, long blockSize,
401      Progressable progress) throws IOException {
402
403    if (exists(f) && !overwrite) {
404      throw new FileAlreadyExistsException("File already exists: " + f);
405    }
406    
407    if(LOG.isDebugEnabled()) {
408      LOG.debug("Creating new file '" + f + "' in S3");
409    }
410    Path absolutePath = makeAbsolute(f);
411    String key = pathToKey(absolutePath);
412    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
413        key, progress, bufferSize), statistics);
414  }
415  
416  @Override
417  public boolean delete(Path f, boolean recurse) throws IOException {
418    FileStatus status;
419    try {
420      status = getFileStatus(f);
421    } catch (FileNotFoundException e) {
422      if(LOG.isDebugEnabled()) {
423        LOG.debug("Delete called for '" + f +
424            "' but file does not exist, so returning false");
425      }
426      return false;
427    }
428    Path absolutePath = makeAbsolute(f);
429    String key = pathToKey(absolutePath);
430    if (status.isDirectory()) {
431      if (!recurse && listStatus(f).length > 0) {
432        throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
433      }
434
435      createParent(f);
436
437      if(LOG.isDebugEnabled()) {
438        LOG.debug("Deleting directory '" + f  + "'");
439      }
440      String priorLastKey = null;
441      do {
442        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
443        for (FileMetadata file : listing.getFiles()) {
444          store.delete(file.getKey());
445        }
446        priorLastKey = listing.getPriorLastKey();
447      } while (priorLastKey != null);
448
449      try {
450        store.delete(key + FOLDER_SUFFIX);
451      } catch (FileNotFoundException e) {
452        //this is fine, we don't require a marker
453      }
454    } else {
455      if(LOG.isDebugEnabled()) {
456        LOG.debug("Deleting file '" + f + "'");
457      }
458      createParent(f);
459      store.delete(key);
460    }
461    return true;
462  }
463
464  @Override
465  public FileStatus getFileStatus(Path f) throws IOException {
466    Path absolutePath = makeAbsolute(f);
467    String key = pathToKey(absolutePath);
468    
469    if (key.length() == 0) { // root always exists
470      return newDirectory(absolutePath);
471    }
472    
473    if(LOG.isDebugEnabled()) {
474      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
475    }
476    FileMetadata meta = store.retrieveMetadata(key);
477    if (meta != null) {
478      if(LOG.isDebugEnabled()) {
479        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
480      }
481      return newFile(meta, absolutePath);
482    }
483    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
484      if(LOG.isDebugEnabled()) {
485        LOG.debug("getFileStatus returning 'directory' for key '" + key +
486            "' as '" + key + FOLDER_SUFFIX + "' exists");
487      }
488      return newDirectory(absolutePath);
489    }
490    
491    if(LOG.isDebugEnabled()) {
492      LOG.debug("getFileStatus listing key '" + key + "'");
493    }
494    PartialListing listing = store.list(key, 1);
495    if (listing.getFiles().length > 0 ||
496        listing.getCommonPrefixes().length > 0) {
497      if(LOG.isDebugEnabled()) {
498        LOG.debug("getFileStatus returning 'directory' for key '" + key +
499            "' as it has contents");
500      }
501      return newDirectory(absolutePath);
502    }
503    
504    if(LOG.isDebugEnabled()) {
505      LOG.debug("getFileStatus could not find key '" + key + "'");
506    }
507    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
508  }
509
510  @Override
511  public URI getUri() {
512    return uri;
513  }
514
515  /**
516   * <p>
517   * If <code>f</code> is a file, this method will make a single call to S3.
518   * If <code>f</code> is a directory, this method will make a maximum of
519   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
520   * files and directories contained directly in <code>f</code>.
521   * </p>
522   */
523  @Override
524  public FileStatus[] listStatus(Path f) throws IOException {
525
526    Path absolutePath = makeAbsolute(f);
527    String key = pathToKey(absolutePath);
528    
529    if (key.length() > 0) {
530      FileMetadata meta = store.retrieveMetadata(key);
531      if (meta != null) {
532        return new FileStatus[] { newFile(meta, absolutePath) };
533      }
534    }
535    
536    URI pathUri = absolutePath.toUri();
537    Set<FileStatus> status = new TreeSet<FileStatus>();
538    String priorLastKey = null;
539    do {
540      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
541      for (FileMetadata fileMetadata : listing.getFiles()) {
542        Path subpath = keyToPath(fileMetadata.getKey());
543        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
544
545        if (fileMetadata.getKey().equals(key + "/")) {
546          // this is just the directory we have been asked to list
547        }
548        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
549          status.add(newDirectory(new Path(
550              absolutePath,
551              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
552        }
553        else {
554          status.add(newFile(fileMetadata, subpath));
555        }
556      }
557      for (String commonPrefix : listing.getCommonPrefixes()) {
558        Path subpath = keyToPath(commonPrefix);
559        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
560        status.add(newDirectory(new Path(absolutePath, relativePath)));
561      }
562      priorLastKey = listing.getPriorLastKey();
563    } while (priorLastKey != null);
564    
565    if (status.isEmpty() &&
566        key.length() > 0 &&
567        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
568      throw new FileNotFoundException("File " + f + " does not exist.");
569    }
570    
571    return status.toArray(new FileStatus[status.size()]);
572  }
573  
574  private FileStatus newFile(FileMetadata meta, Path path) {
575    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
576        meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
577  }
578  
579  private FileStatus newDirectory(Path path) {
580    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
581  }
582
583  @Override
584  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
585    Path absolutePath = makeAbsolute(f);
586    List<Path> paths = new ArrayList<Path>();
587    do {
588      paths.add(0, absolutePath);
589      absolutePath = absolutePath.getParent();
590    } while (absolutePath != null);
591    
592    boolean result = true;
593    for (Path path : paths) {
594      result &= mkdir(path);
595    }
596    return result;
597  }
598  
599  private boolean mkdir(Path f) throws IOException {
600    try {
601      FileStatus fileStatus = getFileStatus(f);
602      if (fileStatus.isFile()) {
603        throw new FileAlreadyExistsException(String.format(
604            "Can't make directory for path '%s' since it is a file.", f));
605
606      }
607    } catch (FileNotFoundException e) {
608      if(LOG.isDebugEnabled()) {
609        LOG.debug("Making dir '" + f + "' in S3");
610      }
611      String key = pathToKey(f) + FOLDER_SUFFIX;
612      store.storeEmptyFile(key);    
613    }
614    return true;
615  }
616
617  @Override
618  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
619    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
620    if (fs.isDirectory()) {
621      throw new FileNotFoundException("'" + f + "' is a directory");
622    }
623    LOG.info("Opening '" + f + "' for reading");
624    Path absolutePath = makeAbsolute(f);
625    String key = pathToKey(absolutePath);
626    return new FSDataInputStream(new BufferedFSInputStream(
627        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
628  }
629  
630  // rename() and delete() use this method to ensure that the parent directory
631  // of the source does not vanish.
632  private void createParent(Path path) throws IOException {
633    Path parent = path.getParent();
634    if (parent != null) {
635      String key = pathToKey(makeAbsolute(parent));
636      if (key.length() > 0) {
637          store.storeEmptyFile(key + FOLDER_SUFFIX);
638      }
639    }
640  }
641  
642    
643  @Override
644  public boolean rename(Path src, Path dst) throws IOException {
645
646    String srcKey = pathToKey(makeAbsolute(src));
647
648    if (srcKey.length() == 0) {
649      // Cannot rename root of file system
650      return false;
651    }
652
653    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
654
655    // Figure out the final destination
656    String dstKey;
657    try {
658      boolean dstIsFile = getFileStatus(dst).isFile();
659      if (dstIsFile) {
660        if(LOG.isDebugEnabled()) {
661          LOG.debug(debugPreamble +
662              "returning false as dst is an already existing file");
663        }
664        return false;
665      } else {
666        if(LOG.isDebugEnabled()) {
667          LOG.debug(debugPreamble + "using dst as output directory");
668        }
669        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
670      }
671    } catch (FileNotFoundException e) {
672      if(LOG.isDebugEnabled()) {
673        LOG.debug(debugPreamble + "using dst as output destination");
674      }
675      dstKey = pathToKey(makeAbsolute(dst));
676      try {
677        if (getFileStatus(dst.getParent()).isFile()) {
678          if(LOG.isDebugEnabled()) {
679            LOG.debug(debugPreamble +
680                "returning false as dst parent exists and is a file");
681          }
682          return false;
683        }
684      } catch (FileNotFoundException ex) {
685        if(LOG.isDebugEnabled()) {
686          LOG.debug(debugPreamble +
687              "returning false as dst parent does not exist");
688        }
689        return false;
690      }
691    }
692
693    boolean srcIsFile;
694    try {
695      srcIsFile = getFileStatus(src).isFile();
696    } catch (FileNotFoundException e) {
697      if(LOG.isDebugEnabled()) {
698        LOG.debug(debugPreamble + "returning false as src does not exist");
699      }
700      return false;
701    }
702    if (srcIsFile) {
703      if(LOG.isDebugEnabled()) {
704        LOG.debug(debugPreamble +
705            "src is file, so doing copy then delete in S3");
706      }
707      store.copy(srcKey, dstKey);
708      store.delete(srcKey);
709    } else {
710      if(LOG.isDebugEnabled()) {
711        LOG.debug(debugPreamble + "src is directory, so copying contents");
712      }
713      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
714
715      List<String> keysToDelete = new ArrayList<String>();
716      String priorLastKey = null;
717      do {
718        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
719        for (FileMetadata file : listing.getFiles()) {
720          keysToDelete.add(file.getKey());
721          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
722        }
723        priorLastKey = listing.getPriorLastKey();
724      } while (priorLastKey != null);
725
726      if(LOG.isDebugEnabled()) {
727        LOG.debug(debugPreamble +
728            "all files in src copied, now removing src files");
729      }
730      for (String key: keysToDelete) {
731        store.delete(key);
732      }
733
734      try {
735        store.delete(srcKey + FOLDER_SUFFIX);
736      } catch (FileNotFoundException e) {
737        //this is fine, we don't require a marker
738      }
739      if(LOG.isDebugEnabled()) {
740        LOG.debug(debugPreamble + "done");
741      }
742    }
743
744    return true;
745  }
746  
747  @Override
748  public long getDefaultBlockSize() {
749    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
750  }
751
752  /**
753   * Set the working directory to the given directory.
754   */
755  @Override
756  public void setWorkingDirectory(Path newDir) {
757    workingDir = newDir;
758  }
759  
760  @Override
761  public Path getWorkingDirectory() {
762    return workingDir;
763  }
764
765  @Override
766  public String getCanonicalServiceName() {
767    // Does not support Token
768    return null;
769  }
770}