001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs.s3native;
020    
021    import java.io.BufferedOutputStream;
022    import java.io.EOFException;
023    import java.io.File;
024    import java.io.FileNotFoundException;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.OutputStream;
029    import java.net.URI;
030    import java.security.DigestOutputStream;
031    import java.security.MessageDigest;
032    import java.security.NoSuchAlgorithmException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Set;
038    import java.util.TreeSet;
039    import java.util.concurrent.TimeUnit;
040    
041    import com.google.common.base.Preconditions;
042    import org.apache.hadoop.classification.InterfaceAudience;
043    import org.apache.hadoop.classification.InterfaceStability;
044    import org.apache.hadoop.conf.Configuration;
045    import org.apache.hadoop.fs.BufferedFSInputStream;
046    import org.apache.hadoop.fs.FSDataInputStream;
047    import org.apache.hadoop.fs.FSDataOutputStream;
048    import org.apache.hadoop.fs.FSExceptionMessages;
049    import org.apache.hadoop.fs.FSInputStream;
050    import org.apache.hadoop.fs.FileAlreadyExistsException;
051    import org.apache.hadoop.fs.FileStatus;
052    import org.apache.hadoop.fs.FileSystem;
053    import org.apache.hadoop.fs.LocalDirAllocator;
054    import org.apache.hadoop.fs.Path;
055    import org.apache.hadoop.fs.permission.FsPermission;
056    import org.apache.hadoop.fs.s3.S3Exception;
057    import org.apache.hadoop.io.retry.RetryPolicies;
058    import org.apache.hadoop.io.retry.RetryPolicy;
059    import org.apache.hadoop.io.retry.RetryProxy;
060    import org.apache.hadoop.util.Progressable;
061    import org.slf4j.Logger;
062    import org.slf4j.LoggerFactory;
063    
064    /**
065     * <p>
066     * A {@link FileSystem} for reading and writing files stored on
067     * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
068     * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
069     * stores files on S3 in their
070     * native form so they can be read by other S3 tools.
071     *
072     * A note about directories. S3 of course has no "native" support for them.
073     * The idiom we choose then is: for any directory created by this class,
074     * we use an empty object "#{dirpath}_$folder$" as a marker.
075     * Further, to interoperate with other S3 tools, we also accept the following:
076     *  - an object "#{dirpath}/' denoting a directory marker
077     *  - if there exists any objects with the prefix "#{dirpath}/", then the
078     *    directory is said to exist
079     *  - if both a file with the name of a directory and a marker for that
080     *    directory exists, then the *file masks the directory*, and the directory
081     *    is never returned.
082     * </p>
083     * @see org.apache.hadoop.fs.s3.S3FileSystem
084     */
085    @InterfaceAudience.Public
086    @InterfaceStability.Stable
087    public class NativeS3FileSystem extends FileSystem {
088      
089      public static final Logger LOG =
090          LoggerFactory.getLogger(NativeS3FileSystem.class);
091      
092      private static final String FOLDER_SUFFIX = "_$folder$";
093      static final String PATH_DELIMITER = Path.SEPARATOR;
094      private static final int S3_MAX_LISTING_LENGTH = 1000;
095      
096      static class NativeS3FsInputStream extends FSInputStream {
097        
098        private NativeFileSystemStore store;
099        private Statistics statistics;
100        private InputStream in;
101        private final String key;
102        private long pos = 0;
103        
104        public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
105          Preconditions.checkNotNull(in, "Null input stream");
106          this.store = store;
107          this.statistics = statistics;
108          this.in = in;
109          this.key = key;
110        }
111        
112        @Override
113        public synchronized int read() throws IOException {
114          int result;
115          try {
116            result = in.read();
117          } catch (IOException e) {
118            LOG.info("Received IOException while reading '{}', attempting to reopen",
119                key);
120            LOG.debug("{}", e, e);
121            try {
122              seek(pos);
123              result = in.read();
124            } catch (EOFException eof) {
125              LOG.debug("EOF on input stream read: {}", eof, eof);
126              result = -1;
127            }
128          } 
129          if (result != -1) {
130            pos++;
131          }
132          if (statistics != null && result != -1) {
133            statistics.incrementBytesRead(1);
134          }
135          return result;
136        }
137        @Override
138        public synchronized int read(byte[] b, int off, int len)
139          throws IOException {
140          if (in == null) {
141            throw new EOFException("Cannot read closed stream");
142          }
143          int result = -1;
144          try {
145            result = in.read(b, off, len);
146          } catch (EOFException eof) {
147            throw eof;
148          } catch (IOException e) {
149            LOG.info( "Received IOException while reading '{}'," +
150                      " attempting to reopen.", key);
151            seek(pos);
152            result = in.read(b, off, len);
153          }
154          if (result > 0) {
155            pos += result;
156          }
157          if (statistics != null && result > 0) {
158            statistics.incrementBytesRead(result);
159          }
160          return result;
161        }
162    
163        @Override
164        public synchronized void close() throws IOException {
165          closeInnerStream();
166        }
167    
168        /**
169         * Close the inner stream if not null. Even if an exception
170         * is raised during the close, the field is set to null
171         * @throws IOException if raised by the close() operation.
172         */
173        private void closeInnerStream() throws IOException {
174          if (in != null) {
175            try {
176              in.close();
177            } finally {
178              in = null;
179            }
180          }
181        }
182    
183        /**
184         * Update inner stream with a new stream and position
185         * @param newStream new stream -must not be null
186         * @param newpos new position
187         * @throws IOException IO exception on a failure to close the existing
188         * stream.
189         */
190        private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
191          Preconditions.checkNotNull(newStream, "Null newstream argument");
192          closeInnerStream();
193          in = newStream;
194          this.pos = newpos;
195        }
196    
197        @Override
198        public synchronized void seek(long newpos) throws IOException {
199          if (newpos < 0) {
200            throw new EOFException(
201                FSExceptionMessages.NEGATIVE_SEEK);
202          }
203          if (pos != newpos) {
204            // the seek is attempting to move the current position
205            LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
206            InputStream newStream = store.retrieve(key, newpos);
207            updateInnerStream(newStream, newpos);
208          }
209        }
210    
211        @Override
212        public synchronized long getPos() throws IOException {
213          return pos;
214        }
215        @Override
216        public boolean seekToNewSource(long targetPos) throws IOException {
217          return false;
218        }
219      }
220      
221      private class NativeS3FsOutputStream extends OutputStream {
222        
223        private Configuration conf;
224        private String key;
225        private File backupFile;
226        private OutputStream backupStream;
227        private MessageDigest digest;
228        private boolean closed;
229        private LocalDirAllocator lDirAlloc;
230        
231        public NativeS3FsOutputStream(Configuration conf,
232            NativeFileSystemStore store, String key, Progressable progress,
233            int bufferSize) throws IOException {
234          this.conf = conf;
235          this.key = key;
236          this.backupFile = newBackupFile();
237          LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
238          try {
239            this.digest = MessageDigest.getInstance("MD5");
240            this.backupStream = new BufferedOutputStream(new DigestOutputStream(
241                new FileOutputStream(backupFile), this.digest));
242          } catch (NoSuchAlgorithmException e) {
243            LOG.warn("Cannot load MD5 digest algorithm," +
244                "skipping message integrity check.", e);
245            this.backupStream = new BufferedOutputStream(
246                new FileOutputStream(backupFile));
247          }
248        }
249    
250        private File newBackupFile() throws IOException {
251          if (lDirAlloc == null) {
252            lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
253          }
254          File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
255          result.deleteOnExit();
256          return result;
257        }
258        
259        @Override
260        public void flush() throws IOException {
261          backupStream.flush();
262        }
263        
264        @Override
265        public synchronized void close() throws IOException {
266          if (closed) {
267            return;
268          }
269    
270          backupStream.close();
271          LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
272          
273          try {
274            byte[] md5Hash = digest == null ? null : digest.digest();
275            store.storeFile(key, backupFile, md5Hash);
276          } finally {
277            if (!backupFile.delete()) {
278              LOG.warn("Could not delete temporary s3n file: " + backupFile);
279            }
280            super.close();
281            closed = true;
282          } 
283          LOG.info("OutputStream for key '{}' upload complete", key);
284        }
285    
286        @Override
287        public void write(int b) throws IOException {
288          backupStream.write(b);
289        }
290    
291        @Override
292        public void write(byte[] b, int off, int len) throws IOException {
293          backupStream.write(b, off, len);
294        }
295      }
296      
297      private URI uri;
298      private NativeFileSystemStore store;
299      private Path workingDir;
300      
301      public NativeS3FileSystem() {
302        // set store in initialize()
303      }
304      
305      public NativeS3FileSystem(NativeFileSystemStore store) {
306        this.store = store;
307      }
308    
309      /**
310       * Return the protocol scheme for the FileSystem.
311       * <p/>
312       *
313       * @return <code>s3n</code>
314       */
315      @Override
316      public String getScheme() {
317        return "s3n";
318      }
319    
320      @Override
321      public void initialize(URI uri, Configuration conf) throws IOException {
322        super.initialize(uri, conf);
323        if (store == null) {
324          store = createDefaultStore(conf);
325        }
326        store.initialize(uri, conf);
327        setConf(conf);
328        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
329        this.workingDir =
330          new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
331      }
332      
333      private static NativeFileSystemStore createDefaultStore(Configuration conf) {
334        NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
335        
336        RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
337            conf.getInt("fs.s3.maxRetries", 4),
338            conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
339        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
340          new HashMap<Class<? extends Exception>, RetryPolicy>();
341        exceptionToPolicyMap.put(IOException.class, basePolicy);
342        exceptionToPolicyMap.put(S3Exception.class, basePolicy);
343        
344        RetryPolicy methodPolicy = RetryPolicies.retryByException(
345            RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
346        Map<String, RetryPolicy> methodNameToPolicyMap =
347          new HashMap<String, RetryPolicy>();
348        methodNameToPolicyMap.put("storeFile", methodPolicy);
349        methodNameToPolicyMap.put("rename", methodPolicy);
350        
351        return (NativeFileSystemStore)
352          RetryProxy.create(NativeFileSystemStore.class, store,
353              methodNameToPolicyMap);
354      }
355      
356      private static String pathToKey(Path path) {
357        if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
358          // allow uris without trailing slash after bucket to refer to root,
359          // like s3n://mybucket
360          return "";
361        }
362        if (!path.isAbsolute()) {
363          throw new IllegalArgumentException("Path must be absolute: " + path);
364        }
365        String ret = path.toUri().getPath().substring(1); // remove initial slash
366        if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
367          ret = ret.substring(0, ret.length() -1);
368      }
369        return ret;
370      }
371      
372      private static Path keyToPath(String key) {
373        return new Path("/" + key);
374      }
375      
376      private Path makeAbsolute(Path path) {
377        if (path.isAbsolute()) {
378          return path;
379        }
380        return new Path(workingDir, path);
381      }
382    
383      /** This optional operation is not yet supported. */
384      @Override
385      public FSDataOutputStream append(Path f, int bufferSize,
386          Progressable progress) throws IOException {
387        throw new IOException("Not supported");
388      }
389      
390      @Override
391      public FSDataOutputStream create(Path f, FsPermission permission,
392          boolean overwrite, int bufferSize, short replication, long blockSize,
393          Progressable progress) throws IOException {
394    
395        if (exists(f) && !overwrite) {
396          throw new FileAlreadyExistsException("File already exists: " + f);
397        }
398        
399        if(LOG.isDebugEnabled()) {
400          LOG.debug("Creating new file '" + f + "' in S3");
401        }
402        Path absolutePath = makeAbsolute(f);
403        String key = pathToKey(absolutePath);
404        return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
405            key, progress, bufferSize), statistics);
406      }
407      
408      @Override
409      public boolean delete(Path f, boolean recurse) throws IOException {
410        FileStatus status;
411        try {
412          status = getFileStatus(f);
413        } catch (FileNotFoundException e) {
414          if(LOG.isDebugEnabled()) {
415            LOG.debug("Delete called for '" + f +
416                "' but file does not exist, so returning false");
417          }
418          return false;
419        }
420        Path absolutePath = makeAbsolute(f);
421        String key = pathToKey(absolutePath);
422        if (status.isDirectory()) {
423          if (!recurse && listStatus(f).length > 0) {
424            throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
425          }
426    
427          createParent(f);
428    
429          if(LOG.isDebugEnabled()) {
430            LOG.debug("Deleting directory '" + f  + "'");
431          }
432          String priorLastKey = null;
433          do {
434            PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
435            for (FileMetadata file : listing.getFiles()) {
436              store.delete(file.getKey());
437            }
438            priorLastKey = listing.getPriorLastKey();
439          } while (priorLastKey != null);
440    
441          try {
442            store.delete(key + FOLDER_SUFFIX);
443          } catch (FileNotFoundException e) {
444            //this is fine, we don't require a marker
445          }
446        } else {
447          if(LOG.isDebugEnabled()) {
448            LOG.debug("Deleting file '" + f + "'");
449          }
450          createParent(f);
451          store.delete(key);
452        }
453        return true;
454      }
455    
456      @Override
457      public FileStatus getFileStatus(Path f) throws IOException {
458        Path absolutePath = makeAbsolute(f);
459        String key = pathToKey(absolutePath);
460        
461        if (key.length() == 0) { // root always exists
462          return newDirectory(absolutePath);
463        }
464        
465        if(LOG.isDebugEnabled()) {
466          LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
467        }
468        FileMetadata meta = store.retrieveMetadata(key);
469        if (meta != null) {
470          if(LOG.isDebugEnabled()) {
471            LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
472          }
473          return newFile(meta, absolutePath);
474        }
475        if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
476          if(LOG.isDebugEnabled()) {
477            LOG.debug("getFileStatus returning 'directory' for key '" + key +
478                "' as '" + key + FOLDER_SUFFIX + "' exists");
479          }
480          return newDirectory(absolutePath);
481        }
482        
483        if(LOG.isDebugEnabled()) {
484          LOG.debug("getFileStatus listing key '" + key + "'");
485        }
486        PartialListing listing = store.list(key, 1);
487        if (listing.getFiles().length > 0 ||
488            listing.getCommonPrefixes().length > 0) {
489          if(LOG.isDebugEnabled()) {
490            LOG.debug("getFileStatus returning 'directory' for key '" + key +
491                "' as it has contents");
492          }
493          return newDirectory(absolutePath);
494        }
495        
496        if(LOG.isDebugEnabled()) {
497          LOG.debug("getFileStatus could not find key '" + key + "'");
498        }
499        throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
500      }
501    
502      @Override
503      public URI getUri() {
504        return uri;
505      }
506    
507      /**
508       * <p>
509       * If <code>f</code> is a file, this method will make a single call to S3.
510       * If <code>f</code> is a directory, this method will make a maximum of
511       * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
512       * files and directories contained directly in <code>f</code>.
513       * </p>
514       */
515      @Override
516      public FileStatus[] listStatus(Path f) throws IOException {
517    
518        Path absolutePath = makeAbsolute(f);
519        String key = pathToKey(absolutePath);
520        
521        if (key.length() > 0) {
522          FileMetadata meta = store.retrieveMetadata(key);
523          if (meta != null) {
524            return new FileStatus[] { newFile(meta, absolutePath) };
525          }
526        }
527        
528        URI pathUri = absolutePath.toUri();
529        Set<FileStatus> status = new TreeSet<FileStatus>();
530        String priorLastKey = null;
531        do {
532          PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
533          for (FileMetadata fileMetadata : listing.getFiles()) {
534            Path subpath = keyToPath(fileMetadata.getKey());
535            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
536    
537            if (fileMetadata.getKey().equals(key + "/")) {
538              // this is just the directory we have been asked to list
539            }
540            else if (relativePath.endsWith(FOLDER_SUFFIX)) {
541              status.add(newDirectory(new Path(
542                  absolutePath,
543                  relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
544            }
545            else {
546              status.add(newFile(fileMetadata, subpath));
547            }
548          }
549          for (String commonPrefix : listing.getCommonPrefixes()) {
550            Path subpath = keyToPath(commonPrefix);
551            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
552            status.add(newDirectory(new Path(absolutePath, relativePath)));
553          }
554          priorLastKey = listing.getPriorLastKey();
555        } while (priorLastKey != null);
556        
557        if (status.isEmpty() &&
558            key.length() > 0 &&
559            store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
560          throw new FileNotFoundException("File " + f + " does not exist.");
561        }
562        
563        return status.toArray(new FileStatus[status.size()]);
564      }
565      
566      private FileStatus newFile(FileMetadata meta, Path path) {
567        return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
568            meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
569      }
570      
571      private FileStatus newDirectory(Path path) {
572        return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
573      }
574    
575      @Override
576      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
577        Path absolutePath = makeAbsolute(f);
578        List<Path> paths = new ArrayList<Path>();
579        do {
580          paths.add(0, absolutePath);
581          absolutePath = absolutePath.getParent();
582        } while (absolutePath != null);
583        
584        boolean result = true;
585        for (Path path : paths) {
586          result &= mkdir(path);
587        }
588        return result;
589      }
590      
591      private boolean mkdir(Path f) throws IOException {
592        try {
593          FileStatus fileStatus = getFileStatus(f);
594          if (fileStatus.isFile()) {
595            throw new FileAlreadyExistsException(String.format(
596                "Can't make directory for path '%s' since it is a file.", f));
597    
598          }
599        } catch (FileNotFoundException e) {
600          if(LOG.isDebugEnabled()) {
601            LOG.debug("Making dir '" + f + "' in S3");
602          }
603          String key = pathToKey(f) + FOLDER_SUFFIX;
604          store.storeEmptyFile(key);    
605        }
606        return true;
607      }
608    
609      @Override
610      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
611        FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
612        if (fs.isDirectory()) {
613          throw new FileNotFoundException("'" + f + "' is a directory");
614        }
615        LOG.info("Opening '" + f + "' for reading");
616        Path absolutePath = makeAbsolute(f);
617        String key = pathToKey(absolutePath);
618        return new FSDataInputStream(new BufferedFSInputStream(
619            new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
620      }
621      
622      // rename() and delete() use this method to ensure that the parent directory
623      // of the source does not vanish.
624      private void createParent(Path path) throws IOException {
625        Path parent = path.getParent();
626        if (parent != null) {
627          String key = pathToKey(makeAbsolute(parent));
628          if (key.length() > 0) {
629              store.storeEmptyFile(key + FOLDER_SUFFIX);
630          }
631        }
632      }
633      
634        
635      @Override
636      public boolean rename(Path src, Path dst) throws IOException {
637    
638        String srcKey = pathToKey(makeAbsolute(src));
639    
640        if (srcKey.length() == 0) {
641          // Cannot rename root of file system
642          return false;
643        }
644    
645        final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
646    
647        // Figure out the final destination
648        String dstKey;
649        try {
650          boolean dstIsFile = getFileStatus(dst).isFile();
651          if (dstIsFile) {
652            if(LOG.isDebugEnabled()) {
653              LOG.debug(debugPreamble +
654                  "returning false as dst is an already existing file");
655            }
656            return false;
657          } else {
658            if(LOG.isDebugEnabled()) {
659              LOG.debug(debugPreamble + "using dst as output directory");
660            }
661            dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
662          }
663        } catch (FileNotFoundException e) {
664          if(LOG.isDebugEnabled()) {
665            LOG.debug(debugPreamble + "using dst as output destination");
666          }
667          dstKey = pathToKey(makeAbsolute(dst));
668          try {
669            if (getFileStatus(dst.getParent()).isFile()) {
670              if(LOG.isDebugEnabled()) {
671                LOG.debug(debugPreamble +
672                    "returning false as dst parent exists and is a file");
673              }
674              return false;
675            }
676          } catch (FileNotFoundException ex) {
677            if(LOG.isDebugEnabled()) {
678              LOG.debug(debugPreamble +
679                  "returning false as dst parent does not exist");
680            }
681            return false;
682          }
683        }
684    
685        boolean srcIsFile;
686        try {
687          srcIsFile = getFileStatus(src).isFile();
688        } catch (FileNotFoundException e) {
689          if(LOG.isDebugEnabled()) {
690            LOG.debug(debugPreamble + "returning false as src does not exist");
691          }
692          return false;
693        }
694        if (srcIsFile) {
695          if(LOG.isDebugEnabled()) {
696            LOG.debug(debugPreamble +
697                "src is file, so doing copy then delete in S3");
698          }
699          store.copy(srcKey, dstKey);
700          store.delete(srcKey);
701        } else {
702          if(LOG.isDebugEnabled()) {
703            LOG.debug(debugPreamble + "src is directory, so copying contents");
704          }
705          store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
706    
707          List<String> keysToDelete = new ArrayList<String>();
708          String priorLastKey = null;
709          do {
710            PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
711            for (FileMetadata file : listing.getFiles()) {
712              keysToDelete.add(file.getKey());
713              store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
714            }
715            priorLastKey = listing.getPriorLastKey();
716          } while (priorLastKey != null);
717    
718          if(LOG.isDebugEnabled()) {
719            LOG.debug(debugPreamble +
720                "all files in src copied, now removing src files");
721          }
722          for (String key: keysToDelete) {
723            store.delete(key);
724          }
725    
726          try {
727            store.delete(srcKey + FOLDER_SUFFIX);
728          } catch (FileNotFoundException e) {
729            //this is fine, we don't require a marker
730          }
731          if(LOG.isDebugEnabled()) {
732            LOG.debug(debugPreamble + "done");
733          }
734        }
735    
736        return true;
737      }
738      
739      @Override
740      public long getDefaultBlockSize() {
741        return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
742      }
743    
744      /**
745       * Set the working directory to the given directory.
746       */
747      @Override
748      public void setWorkingDirectory(Path newDir) {
749        workingDir = newDir;
750      }
751      
752      @Override
753      public Path getWorkingDirectory() {
754        return workingDir;
755      }
756    
757      @Override
758      public String getCanonicalServiceName() {
759        // Does not support Token
760        return null;
761      }
762    }