001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3native;
020
021import java.io.BufferedOutputStream;
022import java.io.EOFException;
023import java.io.File;
024import java.io.FileNotFoundException;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InputStream;
028import java.io.OutputStream;
029import java.net.URI;
030import java.security.DigestOutputStream;
031import java.security.MessageDigest;
032import java.security.NoSuchAlgorithmException;
033import java.util.ArrayList;
034import java.util.HashMap;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeSet;
039import java.util.concurrent.TimeUnit;
040
041import com.google.common.base.Preconditions;
042import org.apache.hadoop.classification.InterfaceAudience;
043import org.apache.hadoop.classification.InterfaceStability;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.BufferedFSInputStream;
046import org.apache.hadoop.fs.FSDataInputStream;
047import org.apache.hadoop.fs.FSDataOutputStream;
048import org.apache.hadoop.fs.FSExceptionMessages;
049import org.apache.hadoop.fs.FSInputStream;
050import org.apache.hadoop.fs.FileAlreadyExistsException;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocalDirAllocator;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.permission.FsPermission;
056import org.apache.hadoop.fs.s3.S3Exception;
057import org.apache.hadoop.io.IOUtils;
058import org.apache.hadoop.io.retry.RetryPolicies;
059import org.apache.hadoop.io.retry.RetryPolicy;
060import org.apache.hadoop.io.retry.RetryProxy;
061import org.apache.hadoop.util.Progressable;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * <p>
067 * A {@link FileSystem} for reading and writing files stored on
068 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
069 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
070 * stores files on S3 in their
071 * native form so they can be read by other S3 tools.
072 *
073 * A note about directories. S3 of course has no "native" support for them.
074 * The idiom we choose then is: for any directory created by this class,
075 * we use an empty object "#{dirpath}_$folder$" as a marker.
076 * Further, to interoperate with other S3 tools, we also accept the following:
077 *  - an object "#{dirpath}/' denoting a directory marker
078 *  - if there exists any objects with the prefix "#{dirpath}/", then the
079 *    directory is said to exist
080 *  - if both a file with the name of a directory and a marker for that
081 *    directory exists, then the *file masks the directory*, and the directory
082 *    is never returned.
083 * </p>
084 * @see org.apache.hadoop.fs.s3.S3FileSystem
085 */
086@InterfaceAudience.Public
087@InterfaceStability.Stable
088public class NativeS3FileSystem extends FileSystem {
089  
090  public static final Logger LOG =
091      LoggerFactory.getLogger(NativeS3FileSystem.class);
092  
093  private static final String FOLDER_SUFFIX = "_$folder$";
094  static final String PATH_DELIMITER = Path.SEPARATOR;
095  private static final int S3_MAX_LISTING_LENGTH = 1000;
096  
097  static class NativeS3FsInputStream extends FSInputStream {
098    
099    private NativeFileSystemStore store;
100    private Statistics statistics;
101    private InputStream in;
102    private final String key;
103    private long pos = 0;
104    
105    public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
106      Preconditions.checkNotNull(in, "Null input stream");
107      this.store = store;
108      this.statistics = statistics;
109      this.in = in;
110      this.key = key;
111    }
112    
113    @Override
114    public synchronized int read() throws IOException {
115      int result;
116      try {
117        result = in.read();
118      } catch (IOException e) {
119        LOG.info("Received IOException while reading '{}', attempting to reopen",
120            key);
121        LOG.debug("{}", e, e);
122        try {
123          reopen(pos);
124          result = in.read();
125        } catch (EOFException eof) {
126          LOG.debug("EOF on input stream read: {}", eof, eof);
127          result = -1;
128        }
129      } 
130      if (result != -1) {
131        pos++;
132      }
133      if (statistics != null && result != -1) {
134        statistics.incrementBytesRead(1);
135      }
136      return result;
137    }
138    @Override
139    public synchronized int read(byte[] b, int off, int len)
140      throws IOException {
141      if (in == null) {
142        throw new EOFException("Cannot read closed stream");
143      }
144      int result = -1;
145      try {
146        result = in.read(b, off, len);
147      } catch (EOFException eof) {
148        throw eof;
149      } catch (IOException e) {
150        LOG.info( "Received IOException while reading '{}'," +
151                  " attempting to reopen.", key);
152        reopen(pos);
153        result = in.read(b, off, len);
154      }
155      if (result > 0) {
156        pos += result;
157      }
158      if (statistics != null && result > 0) {
159        statistics.incrementBytesRead(result);
160      }
161      return result;
162    }
163
164    @Override
165    public synchronized void close() throws IOException {
166      closeInnerStream();
167    }
168
169    /**
170     * Close the inner stream if not null. Even if an exception
171     * is raised during the close, the field is set to null
172     */
173    private void closeInnerStream() {
174      IOUtils.closeStream(in);
175      in = null;
176    }
177
178    /**
179     * Reopen a new input stream with the specified position
180     * @param pos the position to reopen a new stream
181     * @throws IOException
182     */
183    private synchronized void reopen(long pos) throws IOException {
184        LOG.debug("Reopening key '{}' for reading at position '{}", key, pos);
185        InputStream newStream = store.retrieve(key, pos);
186        updateInnerStream(newStream, pos);
187    }
188
189    /**
190     * Update inner stream with a new stream and position
191     * @param newStream new stream -must not be null
192     * @param newpos new position
193     * @throws IOException IO exception on a failure to close the existing
194     * stream.
195     */
196    private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
197      Preconditions.checkNotNull(newStream, "Null newstream argument");
198      closeInnerStream();
199      in = newStream;
200      this.pos = newpos;
201    }
202
203    @Override
204    public synchronized void seek(long newpos) throws IOException {
205      if (newpos < 0) {
206        throw new EOFException(
207            FSExceptionMessages.NEGATIVE_SEEK);
208      }
209      if (pos != newpos) {
210        // the seek is attempting to move the current position
211        reopen(newpos);
212      }
213    }
214
215    @Override
216    public synchronized long getPos() throws IOException {
217      return pos;
218    }
219    @Override
220    public boolean seekToNewSource(long targetPos) throws IOException {
221      return false;
222    }
223  }
224  
225  private class NativeS3FsOutputStream extends OutputStream {
226    
227    private Configuration conf;
228    private String key;
229    private File backupFile;
230    private OutputStream backupStream;
231    private MessageDigest digest;
232    private boolean closed;
233    private LocalDirAllocator lDirAlloc;
234    
235    public NativeS3FsOutputStream(Configuration conf,
236        NativeFileSystemStore store, String key, Progressable progress,
237        int bufferSize) throws IOException {
238      this.conf = conf;
239      this.key = key;
240      this.backupFile = newBackupFile();
241      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
242      try {
243        this.digest = MessageDigest.getInstance("MD5");
244        this.backupStream = new BufferedOutputStream(new DigestOutputStream(
245            new FileOutputStream(backupFile), this.digest));
246      } catch (NoSuchAlgorithmException e) {
247        LOG.warn("Cannot load MD5 digest algorithm," +
248            "skipping message integrity check.", e);
249        this.backupStream = new BufferedOutputStream(
250            new FileOutputStream(backupFile));
251      }
252    }
253
254    private File newBackupFile() throws IOException {
255      if (lDirAlloc == null) {
256        lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
257      }
258      File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
259      result.deleteOnExit();
260      return result;
261    }
262    
263    @Override
264    public void flush() throws IOException {
265      backupStream.flush();
266    }
267    
268    @Override
269    public synchronized void close() throws IOException {
270      if (closed) {
271        return;
272      }
273
274      backupStream.close();
275      LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
276      
277      try {
278        byte[] md5Hash = digest == null ? null : digest.digest();
279        store.storeFile(key, backupFile, md5Hash);
280      } finally {
281        if (!backupFile.delete()) {
282          LOG.warn("Could not delete temporary s3n file: " + backupFile);
283        }
284        super.close();
285        closed = true;
286      } 
287      LOG.info("OutputStream for key '{}' upload complete", key);
288    }
289
290    @Override
291    public void write(int b) throws IOException {
292      backupStream.write(b);
293    }
294
295    @Override
296    public void write(byte[] b, int off, int len) throws IOException {
297      backupStream.write(b, off, len);
298    }
299  }
300  
301  private URI uri;
302  private NativeFileSystemStore store;
303  private Path workingDir;
304  
305  public NativeS3FileSystem() {
306    // set store in initialize()
307  }
308  
309  public NativeS3FileSystem(NativeFileSystemStore store) {
310    this.store = store;
311  }
312
313  /**
314   * Return the protocol scheme for the FileSystem.
315   * <p/>
316   *
317   * @return <code>s3n</code>
318   */
319  @Override
320  public String getScheme() {
321    return "s3n";
322  }
323
324  @Override
325  public void initialize(URI uri, Configuration conf) throws IOException {
326    super.initialize(uri, conf);
327    if (store == null) {
328      store = createDefaultStore(conf);
329    }
330    store.initialize(uri, conf);
331    setConf(conf);
332    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
333    this.workingDir =
334      new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
335  }
336  
337  private static NativeFileSystemStore createDefaultStore(Configuration conf) {
338    NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
339    
340    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
341        conf.getInt("fs.s3.maxRetries", 4),
342        conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
343    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
344      new HashMap<Class<? extends Exception>, RetryPolicy>();
345    exceptionToPolicyMap.put(IOException.class, basePolicy);
346    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
347    
348    RetryPolicy methodPolicy = RetryPolicies.retryByException(
349        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
350    Map<String, RetryPolicy> methodNameToPolicyMap =
351      new HashMap<String, RetryPolicy>();
352    methodNameToPolicyMap.put("storeFile", methodPolicy);
353    methodNameToPolicyMap.put("rename", methodPolicy);
354    
355    return (NativeFileSystemStore)
356      RetryProxy.create(NativeFileSystemStore.class, store,
357          methodNameToPolicyMap);
358  }
359  
360  private static String pathToKey(Path path) {
361    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
362      // allow uris without trailing slash after bucket to refer to root,
363      // like s3n://mybucket
364      return "";
365    }
366    if (!path.isAbsolute()) {
367      throw new IllegalArgumentException("Path must be absolute: " + path);
368    }
369    String ret = path.toUri().getPath().substring(1); // remove initial slash
370    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
371      ret = ret.substring(0, ret.length() -1);
372  }
373    return ret;
374  }
375  
376  private static Path keyToPath(String key) {
377    return new Path("/" + key);
378  }
379  
380  private Path makeAbsolute(Path path) {
381    if (path.isAbsolute()) {
382      return path;
383    }
384    return new Path(workingDir, path);
385  }
386
387  /** This optional operation is not yet supported. */
388  @Override
389  public FSDataOutputStream append(Path f, int bufferSize,
390      Progressable progress) throws IOException {
391    throw new IOException("Not supported");
392  }
393  
394  @Override
395  public FSDataOutputStream create(Path f, FsPermission permission,
396      boolean overwrite, int bufferSize, short replication, long blockSize,
397      Progressable progress) throws IOException {
398
399    if (exists(f) && !overwrite) {
400      throw new FileAlreadyExistsException("File already exists: " + f);
401    }
402    
403    if(LOG.isDebugEnabled()) {
404      LOG.debug("Creating new file '" + f + "' in S3");
405    }
406    Path absolutePath = makeAbsolute(f);
407    String key = pathToKey(absolutePath);
408    return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
409        key, progress, bufferSize), statistics);
410  }
411  
412  @Override
413  public boolean delete(Path f, boolean recurse) throws IOException {
414    FileStatus status;
415    try {
416      status = getFileStatus(f);
417    } catch (FileNotFoundException e) {
418      if(LOG.isDebugEnabled()) {
419        LOG.debug("Delete called for '" + f +
420            "' but file does not exist, so returning false");
421      }
422      return false;
423    }
424    Path absolutePath = makeAbsolute(f);
425    String key = pathToKey(absolutePath);
426    if (status.isDirectory()) {
427      if (!recurse && listStatus(f).length > 0) {
428        throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
429      }
430
431      createParent(f);
432
433      if(LOG.isDebugEnabled()) {
434        LOG.debug("Deleting directory '" + f  + "'");
435      }
436      String priorLastKey = null;
437      do {
438        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
439        for (FileMetadata file : listing.getFiles()) {
440          store.delete(file.getKey());
441        }
442        priorLastKey = listing.getPriorLastKey();
443      } while (priorLastKey != null);
444
445      try {
446        store.delete(key + FOLDER_SUFFIX);
447      } catch (FileNotFoundException e) {
448        //this is fine, we don't require a marker
449      }
450    } else {
451      if(LOG.isDebugEnabled()) {
452        LOG.debug("Deleting file '" + f + "'");
453      }
454      createParent(f);
455      store.delete(key);
456    }
457    return true;
458  }
459
460  @Override
461  public FileStatus getFileStatus(Path f) throws IOException {
462    Path absolutePath = makeAbsolute(f);
463    String key = pathToKey(absolutePath);
464    
465    if (key.length() == 0) { // root always exists
466      return newDirectory(absolutePath);
467    }
468    
469    if(LOG.isDebugEnabled()) {
470      LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
471    }
472    FileMetadata meta = store.retrieveMetadata(key);
473    if (meta != null) {
474      if(LOG.isDebugEnabled()) {
475        LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
476      }
477      return newFile(meta, absolutePath);
478    }
479    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
480      if(LOG.isDebugEnabled()) {
481        LOG.debug("getFileStatus returning 'directory' for key '" + key +
482            "' as '" + key + FOLDER_SUFFIX + "' exists");
483      }
484      return newDirectory(absolutePath);
485    }
486    
487    if(LOG.isDebugEnabled()) {
488      LOG.debug("getFileStatus listing key '" + key + "'");
489    }
490    PartialListing listing = store.list(key, 1);
491    if (listing.getFiles().length > 0 ||
492        listing.getCommonPrefixes().length > 0) {
493      if(LOG.isDebugEnabled()) {
494        LOG.debug("getFileStatus returning 'directory' for key '" + key +
495            "' as it has contents");
496      }
497      return newDirectory(absolutePath);
498    }
499    
500    if(LOG.isDebugEnabled()) {
501      LOG.debug("getFileStatus could not find key '" + key + "'");
502    }
503    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
504  }
505
506  @Override
507  public URI getUri() {
508    return uri;
509  }
510
511  /**
512   * <p>
513   * If <code>f</code> is a file, this method will make a single call to S3.
514   * If <code>f</code> is a directory, this method will make a maximum of
515   * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
516   * files and directories contained directly in <code>f</code>.
517   * </p>
518   */
519  @Override
520  public FileStatus[] listStatus(Path f) throws IOException {
521
522    Path absolutePath = makeAbsolute(f);
523    String key = pathToKey(absolutePath);
524    
525    if (key.length() > 0) {
526      FileMetadata meta = store.retrieveMetadata(key);
527      if (meta != null) {
528        return new FileStatus[] { newFile(meta, absolutePath) };
529      }
530    }
531    
532    URI pathUri = absolutePath.toUri();
533    Set<FileStatus> status = new TreeSet<FileStatus>();
534    String priorLastKey = null;
535    do {
536      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
537      for (FileMetadata fileMetadata : listing.getFiles()) {
538        Path subpath = keyToPath(fileMetadata.getKey());
539        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
540
541        if (fileMetadata.getKey().equals(key + "/")) {
542          // this is just the directory we have been asked to list
543        }
544        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
545          status.add(newDirectory(new Path(
546              absolutePath,
547              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
548        }
549        else {
550          status.add(newFile(fileMetadata, subpath));
551        }
552      }
553      for (String commonPrefix : listing.getCommonPrefixes()) {
554        Path subpath = keyToPath(commonPrefix);
555        String relativePath = pathUri.relativize(subpath.toUri()).getPath();
556        status.add(newDirectory(new Path(absolutePath, relativePath)));
557      }
558      priorLastKey = listing.getPriorLastKey();
559    } while (priorLastKey != null);
560    
561    if (status.isEmpty() &&
562        key.length() > 0 &&
563        store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
564      throw new FileNotFoundException("File " + f + " does not exist.");
565    }
566    
567    return status.toArray(new FileStatus[status.size()]);
568  }
569  
570  private FileStatus newFile(FileMetadata meta, Path path) {
571    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
572        meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
573  }
574  
575  private FileStatus newDirectory(Path path) {
576    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
577  }
578
579  @Override
580  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
581    Path absolutePath = makeAbsolute(f);
582    List<Path> paths = new ArrayList<Path>();
583    do {
584      paths.add(0, absolutePath);
585      absolutePath = absolutePath.getParent();
586    } while (absolutePath != null);
587    
588    boolean result = true;
589    for (Path path : paths) {
590      result &= mkdir(path);
591    }
592    return result;
593  }
594  
595  private boolean mkdir(Path f) throws IOException {
596    try {
597      FileStatus fileStatus = getFileStatus(f);
598      if (fileStatus.isFile()) {
599        throw new FileAlreadyExistsException(String.format(
600            "Can't make directory for path '%s' since it is a file.", f));
601
602      }
603    } catch (FileNotFoundException e) {
604      if(LOG.isDebugEnabled()) {
605        LOG.debug("Making dir '" + f + "' in S3");
606      }
607      String key = pathToKey(f) + FOLDER_SUFFIX;
608      store.storeEmptyFile(key);    
609    }
610    return true;
611  }
612
613  @Override
614  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
615    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
616    if (fs.isDirectory()) {
617      throw new FileNotFoundException("'" + f + "' is a directory");
618    }
619    LOG.info("Opening '" + f + "' for reading");
620    Path absolutePath = makeAbsolute(f);
621    String key = pathToKey(absolutePath);
622    return new FSDataInputStream(new BufferedFSInputStream(
623        new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
624  }
625  
626  // rename() and delete() use this method to ensure that the parent directory
627  // of the source does not vanish.
628  private void createParent(Path path) throws IOException {
629    Path parent = path.getParent();
630    if (parent != null) {
631      String key = pathToKey(makeAbsolute(parent));
632      if (key.length() > 0) {
633          store.storeEmptyFile(key + FOLDER_SUFFIX);
634      }
635    }
636  }
637  
638    
639  @Override
640  public boolean rename(Path src, Path dst) throws IOException {
641
642    String srcKey = pathToKey(makeAbsolute(src));
643
644    if (srcKey.length() == 0) {
645      // Cannot rename root of file system
646      return false;
647    }
648
649    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
650
651    // Figure out the final destination
652    String dstKey;
653    try {
654      boolean dstIsFile = getFileStatus(dst).isFile();
655      if (dstIsFile) {
656        if(LOG.isDebugEnabled()) {
657          LOG.debug(debugPreamble +
658              "returning false as dst is an already existing file");
659        }
660        return false;
661      } else {
662        if(LOG.isDebugEnabled()) {
663          LOG.debug(debugPreamble + "using dst as output directory");
664        }
665        dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
666      }
667    } catch (FileNotFoundException e) {
668      if(LOG.isDebugEnabled()) {
669        LOG.debug(debugPreamble + "using dst as output destination");
670      }
671      dstKey = pathToKey(makeAbsolute(dst));
672      try {
673        if (getFileStatus(dst.getParent()).isFile()) {
674          if(LOG.isDebugEnabled()) {
675            LOG.debug(debugPreamble +
676                "returning false as dst parent exists and is a file");
677          }
678          return false;
679        }
680      } catch (FileNotFoundException ex) {
681        if(LOG.isDebugEnabled()) {
682          LOG.debug(debugPreamble +
683              "returning false as dst parent does not exist");
684        }
685        return false;
686      }
687    }
688
689    boolean srcIsFile;
690    try {
691      srcIsFile = getFileStatus(src).isFile();
692    } catch (FileNotFoundException e) {
693      if(LOG.isDebugEnabled()) {
694        LOG.debug(debugPreamble + "returning false as src does not exist");
695      }
696      return false;
697    }
698    if (srcIsFile) {
699      if(LOG.isDebugEnabled()) {
700        LOG.debug(debugPreamble +
701            "src is file, so doing copy then delete in S3");
702      }
703      store.copy(srcKey, dstKey);
704      store.delete(srcKey);
705    } else {
706      if(LOG.isDebugEnabled()) {
707        LOG.debug(debugPreamble + "src is directory, so copying contents");
708      }
709      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
710
711      List<String> keysToDelete = new ArrayList<String>();
712      String priorLastKey = null;
713      do {
714        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
715        for (FileMetadata file : listing.getFiles()) {
716          keysToDelete.add(file.getKey());
717          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
718        }
719        priorLastKey = listing.getPriorLastKey();
720      } while (priorLastKey != null);
721
722      if(LOG.isDebugEnabled()) {
723        LOG.debug(debugPreamble +
724            "all files in src copied, now removing src files");
725      }
726      for (String key: keysToDelete) {
727        store.delete(key);
728      }
729
730      try {
731        store.delete(srcKey + FOLDER_SUFFIX);
732      } catch (FileNotFoundException e) {
733        //this is fine, we don't require a marker
734      }
735      if(LOG.isDebugEnabled()) {
736        LOG.debug(debugPreamble + "done");
737      }
738    }
739
740    return true;
741  }
742  
743  @Override
744  public long getDefaultBlockSize() {
745    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
746  }
747
748  /**
749   * Set the working directory to the given directory.
750   */
751  @Override
752  public void setWorkingDirectory(Path newDir) {
753    workingDir = newDir;
754  }
755  
756  @Override
757  public Path getWorkingDirectory() {
758    return workingDir;
759  }
760
761  @Override
762  public String getCanonicalServiceName() {
763    // Does not support Token
764    return null;
765  }
766}