001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs.s3native;
020    
021    import java.io.BufferedOutputStream;
022    import java.io.EOFException;
023    import java.io.File;
024    import java.io.FileNotFoundException;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.OutputStream;
029    import java.net.URI;
030    import java.security.DigestOutputStream;
031    import java.security.MessageDigest;
032    import java.security.NoSuchAlgorithmException;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.Set;
038    import java.util.TreeSet;
039    import java.util.concurrent.TimeUnit;
040    
041    import com.google.common.base.Preconditions;
042    import org.apache.hadoop.classification.InterfaceAudience;
043    import org.apache.hadoop.classification.InterfaceStability;
044    import org.apache.hadoop.conf.Configuration;
045    import org.apache.hadoop.fs.BufferedFSInputStream;
046    import org.apache.hadoop.fs.FSDataInputStream;
047    import org.apache.hadoop.fs.FSDataOutputStream;
048    import org.apache.hadoop.fs.FSExceptionMessages;
049    import org.apache.hadoop.fs.FSInputStream;
050    import org.apache.hadoop.fs.FileAlreadyExistsException;
051    import org.apache.hadoop.fs.FileStatus;
052    import org.apache.hadoop.fs.FileSystem;
053    import org.apache.hadoop.fs.Path;
054    import org.apache.hadoop.fs.permission.FsPermission;
055    import org.apache.hadoop.fs.s3.S3Exception;
056    import org.apache.hadoop.io.retry.RetryPolicies;
057    import org.apache.hadoop.io.retry.RetryPolicy;
058    import org.apache.hadoop.io.retry.RetryProxy;
059    import org.apache.hadoop.util.Progressable;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    /**
064     * <p>
065     * A {@link FileSystem} for reading and writing files stored on
066     * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
067     * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
068     * stores files on S3 in their
069     * native form so they can be read by other S3 tools.
070     *
071     * A note about directories. S3 of course has no "native" support for them.
072     * The idiom we choose then is: for any directory created by this class,
073     * we use an empty object "#{dirpath}_$folder$" as a marker.
074     * Further, to interoperate with other S3 tools, we also accept the following:
075     *  - an object "#{dirpath}/' denoting a directory marker
076     *  - if there exists any objects with the prefix "#{dirpath}/", then the
077     *    directory is said to exist
078     *  - if both a file with the name of a directory and a marker for that
079     *    directory exists, then the *file masks the directory*, and the directory
080     *    is never returned.
081     * </p>
082     * @see org.apache.hadoop.fs.s3.S3FileSystem
083     */
084    @InterfaceAudience.Public
085    @InterfaceStability.Stable
086    public class NativeS3FileSystem extends FileSystem {
087      
088      public static final Logger LOG =
089          LoggerFactory.getLogger(NativeS3FileSystem.class);
090      
091      private static final String FOLDER_SUFFIX = "_$folder$";
092      static final String PATH_DELIMITER = Path.SEPARATOR;
093      private static final int S3_MAX_LISTING_LENGTH = 1000;
094      
095      static class NativeS3FsInputStream extends FSInputStream {
096        
097        private NativeFileSystemStore store;
098        private Statistics statistics;
099        private InputStream in;
100        private final String key;
101        private long pos = 0;
102        
103        public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
104          Preconditions.checkNotNull(in, "Null input stream");
105          this.store = store;
106          this.statistics = statistics;
107          this.in = in;
108          this.key = key;
109        }
110        
111        @Override
112        public synchronized int read() throws IOException {
113          int result;
114          try {
115            result = in.read();
116          } catch (IOException e) {
117            LOG.info("Received IOException while reading '{}', attempting to reopen",
118                key);
119            LOG.debug("{}", e, e);
120            try {
121              seek(pos);
122              result = in.read();
123            } catch (EOFException eof) {
124              LOG.debug("EOF on input stream read: {}", eof, eof);
125              result = -1;
126            }
127          } 
128          if (result != -1) {
129            pos++;
130          }
131          if (statistics != null && result != -1) {
132            statistics.incrementBytesRead(1);
133          }
134          return result;
135        }
136        @Override
137        public synchronized int read(byte[] b, int off, int len)
138          throws IOException {
139          if (in == null) {
140            throw new EOFException("Cannot read closed stream");
141          }
142          int result = -1;
143          try {
144            result = in.read(b, off, len);
145          } catch (EOFException eof) {
146            throw eof;
147          } catch (IOException e) {
148            LOG.info( "Received IOException while reading '{}'," +
149                      " attempting to reopen.", key);
150            seek(pos);
151            result = in.read(b, off, len);
152          }
153          if (result > 0) {
154            pos += result;
155          }
156          if (statistics != null && result > 0) {
157            statistics.incrementBytesRead(result);
158          }
159          return result;
160        }
161    
162        @Override
163        public synchronized void close() throws IOException {
164          closeInnerStream();
165        }
166    
167        /**
168         * Close the inner stream if not null. Even if an exception
169         * is raised during the close, the field is set to null
170         * @throws IOException if raised by the close() operation.
171         */
172        private void closeInnerStream() throws IOException {
173          if (in != null) {
174            try {
175              in.close();
176            } finally {
177              in = null;
178            }
179          }
180        }
181    
182        /**
183         * Update inner stream with a new stream and position
184         * @param newStream new stream -must not be null
185         * @param newpos new position
186         * @throws IOException IO exception on a failure to close the existing
187         * stream.
188         */
189        private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
190          Preconditions.checkNotNull(newStream, "Null newstream argument");
191          closeInnerStream();
192          in = newStream;
193          this.pos = newpos;
194        }
195    
196        @Override
197        public synchronized void seek(long newpos) throws IOException {
198          if (newpos < 0) {
199            throw new EOFException(
200                FSExceptionMessages.NEGATIVE_SEEK);
201          }
202          if (pos != newpos) {
203            // the seek is attempting to move the current position
204            LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
205            InputStream newStream = store.retrieve(key, newpos);
206            updateInnerStream(newStream, newpos);
207          }
208        }
209    
210        @Override
211        public synchronized long getPos() throws IOException {
212          return pos;
213        }
214        @Override
215        public boolean seekToNewSource(long targetPos) throws IOException {
216          return false;
217        }
218      }
219      
220      private class NativeS3FsOutputStream extends OutputStream {
221        
222        private Configuration conf;
223        private String key;
224        private File backupFile;
225        private OutputStream backupStream;
226        private MessageDigest digest;
227        private boolean closed;
228        
229        public NativeS3FsOutputStream(Configuration conf,
230            NativeFileSystemStore store, String key, Progressable progress,
231            int bufferSize) throws IOException {
232          this.conf = conf;
233          this.key = key;
234          this.backupFile = newBackupFile();
235          LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
236          try {
237            this.digest = MessageDigest.getInstance("MD5");
238            this.backupStream = new BufferedOutputStream(new DigestOutputStream(
239                new FileOutputStream(backupFile), this.digest));
240          } catch (NoSuchAlgorithmException e) {
241            LOG.warn("Cannot load MD5 digest algorithm," +
242                "skipping message integrity check.", e);
243            this.backupStream = new BufferedOutputStream(
244                new FileOutputStream(backupFile));
245          }
246        }
247    
248        private File newBackupFile() throws IOException {
249          File dir = new File(conf.get("fs.s3.buffer.dir"));
250          if (!dir.mkdirs() && !dir.exists()) {
251            throw new IOException("Cannot create S3 buffer directory: " + dir);
252          }
253          File result = File.createTempFile("output-", ".tmp", dir);
254          result.deleteOnExit();
255          return result;
256        }
257        
258        @Override
259        public void flush() throws IOException {
260          backupStream.flush();
261        }
262        
263        @Override
264        public synchronized void close() throws IOException {
265          if (closed) {
266            return;
267          }
268    
269          backupStream.close();
270          LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
271          
272          try {
273            byte[] md5Hash = digest == null ? null : digest.digest();
274            store.storeFile(key, backupFile, md5Hash);
275          } finally {
276            if (!backupFile.delete()) {
277              LOG.warn("Could not delete temporary s3n file: " + backupFile);
278            }
279            super.close();
280            closed = true;
281          } 
282          LOG.info("OutputStream for key '{}' upload complete", key);
283        }
284    
285        @Override
286        public void write(int b) throws IOException {
287          backupStream.write(b);
288        }
289    
290        @Override
291        public void write(byte[] b, int off, int len) throws IOException {
292          backupStream.write(b, off, len);
293        }
294      }
295      
296      private URI uri;
297      private NativeFileSystemStore store;
298      private Path workingDir;
299      
300      public NativeS3FileSystem() {
301        // set store in initialize()
302      }
303      
304      public NativeS3FileSystem(NativeFileSystemStore store) {
305        this.store = store;
306      }
307    
308      /**
309       * Return the protocol scheme for the FileSystem.
310       * <p/>
311       *
312       * @return <code>s3n</code>
313       */
314      @Override
315      public String getScheme() {
316        return "s3n";
317      }
318    
319      @Override
320      public void initialize(URI uri, Configuration conf) throws IOException {
321        super.initialize(uri, conf);
322        if (store == null) {
323          store = createDefaultStore(conf);
324        }
325        store.initialize(uri, conf);
326        setConf(conf);
327        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
328        this.workingDir =
329          new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
330      }
331      
332      private static NativeFileSystemStore createDefaultStore(Configuration conf) {
333        NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
334        
335        RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
336            conf.getInt("fs.s3.maxRetries", 4),
337            conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
338        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
339          new HashMap<Class<? extends Exception>, RetryPolicy>();
340        exceptionToPolicyMap.put(IOException.class, basePolicy);
341        exceptionToPolicyMap.put(S3Exception.class, basePolicy);
342        
343        RetryPolicy methodPolicy = RetryPolicies.retryByException(
344            RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
345        Map<String, RetryPolicy> methodNameToPolicyMap =
346          new HashMap<String, RetryPolicy>();
347        methodNameToPolicyMap.put("storeFile", methodPolicy);
348        methodNameToPolicyMap.put("rename", methodPolicy);
349        
350        return (NativeFileSystemStore)
351          RetryProxy.create(NativeFileSystemStore.class, store,
352              methodNameToPolicyMap);
353      }
354      
355      private static String pathToKey(Path path) {
356        if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
357          // allow uris without trailing slash after bucket to refer to root,
358          // like s3n://mybucket
359          return "";
360        }
361        if (!path.isAbsolute()) {
362          throw new IllegalArgumentException("Path must be absolute: " + path);
363        }
364        String ret = path.toUri().getPath().substring(1); // remove initial slash
365        if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
366          ret = ret.substring(0, ret.length() -1);
367      }
368        return ret;
369      }
370      
371      private static Path keyToPath(String key) {
372        return new Path("/" + key);
373      }
374      
375      private Path makeAbsolute(Path path) {
376        if (path.isAbsolute()) {
377          return path;
378        }
379        return new Path(workingDir, path);
380      }
381    
382      /** This optional operation is not yet supported. */
383      @Override
384      public FSDataOutputStream append(Path f, int bufferSize,
385          Progressable progress) throws IOException {
386        throw new IOException("Not supported");
387      }
388      
389      @Override
390      public FSDataOutputStream create(Path f, FsPermission permission,
391          boolean overwrite, int bufferSize, short replication, long blockSize,
392          Progressable progress) throws IOException {
393    
394        if (exists(f) && !overwrite) {
395          throw new FileAlreadyExistsException("File already exists: " + f);
396        }
397        
398        if(LOG.isDebugEnabled()) {
399          LOG.debug("Creating new file '" + f + "' in S3");
400        }
401        Path absolutePath = makeAbsolute(f);
402        String key = pathToKey(absolutePath);
403        return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
404            key, progress, bufferSize), statistics);
405      }
406      
407      @Override
408      public boolean delete(Path f, boolean recurse) throws IOException {
409        FileStatus status;
410        try {
411          status = getFileStatus(f);
412        } catch (FileNotFoundException e) {
413          if(LOG.isDebugEnabled()) {
414            LOG.debug("Delete called for '" + f +
415                "' but file does not exist, so returning false");
416          }
417          return false;
418        }
419        Path absolutePath = makeAbsolute(f);
420        String key = pathToKey(absolutePath);
421        if (status.isDirectory()) {
422          if (!recurse && listStatus(f).length > 0) {
423            throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
424          }
425    
426          createParent(f);
427    
428          if(LOG.isDebugEnabled()) {
429            LOG.debug("Deleting directory '" + f  + "'");
430          }
431          String priorLastKey = null;
432          do {
433            PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
434            for (FileMetadata file : listing.getFiles()) {
435              store.delete(file.getKey());
436            }
437            priorLastKey = listing.getPriorLastKey();
438          } while (priorLastKey != null);
439    
440          try {
441            store.delete(key + FOLDER_SUFFIX);
442          } catch (FileNotFoundException e) {
443            //this is fine, we don't require a marker
444          }
445        } else {
446          if(LOG.isDebugEnabled()) {
447            LOG.debug("Deleting file '" + f + "'");
448          }
449          createParent(f);
450          store.delete(key);
451        }
452        return true;
453      }
454    
455      @Override
456      public FileStatus getFileStatus(Path f) throws IOException {
457        Path absolutePath = makeAbsolute(f);
458        String key = pathToKey(absolutePath);
459        
460        if (key.length() == 0) { // root always exists
461          return newDirectory(absolutePath);
462        }
463        
464        if(LOG.isDebugEnabled()) {
465          LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
466        }
467        FileMetadata meta = store.retrieveMetadata(key);
468        if (meta != null) {
469          if(LOG.isDebugEnabled()) {
470            LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
471          }
472          return newFile(meta, absolutePath);
473        }
474        if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
475          if(LOG.isDebugEnabled()) {
476            LOG.debug("getFileStatus returning 'directory' for key '" + key +
477                "' as '" + key + FOLDER_SUFFIX + "' exists");
478          }
479          return newDirectory(absolutePath);
480        }
481        
482        if(LOG.isDebugEnabled()) {
483          LOG.debug("getFileStatus listing key '" + key + "'");
484        }
485        PartialListing listing = store.list(key, 1);
486        if (listing.getFiles().length > 0 ||
487            listing.getCommonPrefixes().length > 0) {
488          if(LOG.isDebugEnabled()) {
489            LOG.debug("getFileStatus returning 'directory' for key '" + key +
490                "' as it has contents");
491          }
492          return newDirectory(absolutePath);
493        }
494        
495        if(LOG.isDebugEnabled()) {
496          LOG.debug("getFileStatus could not find key '" + key + "'");
497        }
498        throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
499      }
500    
501      @Override
502      public URI getUri() {
503        return uri;
504      }
505    
506      /**
507       * <p>
508       * If <code>f</code> is a file, this method will make a single call to S3.
509       * If <code>f</code> is a directory, this method will make a maximum of
510       * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
511       * files and directories contained directly in <code>f</code>.
512       * </p>
513       */
514      @Override
515      public FileStatus[] listStatus(Path f) throws IOException {
516    
517        Path absolutePath = makeAbsolute(f);
518        String key = pathToKey(absolutePath);
519        
520        if (key.length() > 0) {
521          FileMetadata meta = store.retrieveMetadata(key);
522          if (meta != null) {
523            return new FileStatus[] { newFile(meta, absolutePath) };
524          }
525        }
526        
527        URI pathUri = absolutePath.toUri();
528        Set<FileStatus> status = new TreeSet<FileStatus>();
529        String priorLastKey = null;
530        do {
531          PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
532          for (FileMetadata fileMetadata : listing.getFiles()) {
533            Path subpath = keyToPath(fileMetadata.getKey());
534            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
535    
536            if (fileMetadata.getKey().equals(key + "/")) {
537              // this is just the directory we have been asked to list
538            }
539            else if (relativePath.endsWith(FOLDER_SUFFIX)) {
540              status.add(newDirectory(new Path(
541                  absolutePath,
542                  relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
543            }
544            else {
545              status.add(newFile(fileMetadata, subpath));
546            }
547          }
548          for (String commonPrefix : listing.getCommonPrefixes()) {
549            Path subpath = keyToPath(commonPrefix);
550            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
551            status.add(newDirectory(new Path(absolutePath, relativePath)));
552          }
553          priorLastKey = listing.getPriorLastKey();
554        } while (priorLastKey != null);
555        
556        if (status.isEmpty() &&
557            key.length() > 0 &&
558            store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
559          throw new FileNotFoundException("File " + f + " does not exist.");
560        }
561        
562        return status.toArray(new FileStatus[status.size()]);
563      }
564      
565      private FileStatus newFile(FileMetadata meta, Path path) {
566        return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
567            meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
568      }
569      
570      private FileStatus newDirectory(Path path) {
571        return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
572      }
573    
574      @Override
575      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
576        Path absolutePath = makeAbsolute(f);
577        List<Path> paths = new ArrayList<Path>();
578        do {
579          paths.add(0, absolutePath);
580          absolutePath = absolutePath.getParent();
581        } while (absolutePath != null);
582        
583        boolean result = true;
584        for (Path path : paths) {
585          result &= mkdir(path);
586        }
587        return result;
588      }
589      
590      private boolean mkdir(Path f) throws IOException {
591        try {
592          FileStatus fileStatus = getFileStatus(f);
593          if (fileStatus.isFile()) {
594            throw new FileAlreadyExistsException(String.format(
595                "Can't make directory for path '%s' since it is a file.", f));
596    
597          }
598        } catch (FileNotFoundException e) {
599          if(LOG.isDebugEnabled()) {
600            LOG.debug("Making dir '" + f + "' in S3");
601          }
602          String key = pathToKey(f) + FOLDER_SUFFIX;
603          store.storeEmptyFile(key);    
604        }
605        return true;
606      }
607    
608      @Override
609      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
610        FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
611        if (fs.isDirectory()) {
612          throw new FileNotFoundException("'" + f + "' is a directory");
613        }
614        LOG.info("Opening '" + f + "' for reading");
615        Path absolutePath = makeAbsolute(f);
616        String key = pathToKey(absolutePath);
617        return new FSDataInputStream(new BufferedFSInputStream(
618            new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
619      }
620      
621      // rename() and delete() use this method to ensure that the parent directory
622      // of the source does not vanish.
623      private void createParent(Path path) throws IOException {
624        Path parent = path.getParent();
625        if (parent != null) {
626          String key = pathToKey(makeAbsolute(parent));
627          if (key.length() > 0) {
628              store.storeEmptyFile(key + FOLDER_SUFFIX);
629          }
630        }
631      }
632      
633        
634      @Override
635      public boolean rename(Path src, Path dst) throws IOException {
636    
637        String srcKey = pathToKey(makeAbsolute(src));
638    
639        if (srcKey.length() == 0) {
640          // Cannot rename root of file system
641          return false;
642        }
643    
644        final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
645    
646        // Figure out the final destination
647        String dstKey;
648        try {
649          boolean dstIsFile = getFileStatus(dst).isFile();
650          if (dstIsFile) {
651            if(LOG.isDebugEnabled()) {
652              LOG.debug(debugPreamble +
653                  "returning false as dst is an already existing file");
654            }
655            return false;
656          } else {
657            if(LOG.isDebugEnabled()) {
658              LOG.debug(debugPreamble + "using dst as output directory");
659            }
660            dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
661          }
662        } catch (FileNotFoundException e) {
663          if(LOG.isDebugEnabled()) {
664            LOG.debug(debugPreamble + "using dst as output destination");
665          }
666          dstKey = pathToKey(makeAbsolute(dst));
667          try {
668            if (getFileStatus(dst.getParent()).isFile()) {
669              if(LOG.isDebugEnabled()) {
670                LOG.debug(debugPreamble +
671                    "returning false as dst parent exists and is a file");
672              }
673              return false;
674            }
675          } catch (FileNotFoundException ex) {
676            if(LOG.isDebugEnabled()) {
677              LOG.debug(debugPreamble +
678                  "returning false as dst parent does not exist");
679            }
680            return false;
681          }
682        }
683    
684        boolean srcIsFile;
685        try {
686          srcIsFile = getFileStatus(src).isFile();
687        } catch (FileNotFoundException e) {
688          if(LOG.isDebugEnabled()) {
689            LOG.debug(debugPreamble + "returning false as src does not exist");
690          }
691          return false;
692        }
693        if (srcIsFile) {
694          if(LOG.isDebugEnabled()) {
695            LOG.debug(debugPreamble +
696                "src is file, so doing copy then delete in S3");
697          }
698          store.copy(srcKey, dstKey);
699          store.delete(srcKey);
700        } else {
701          if(LOG.isDebugEnabled()) {
702            LOG.debug(debugPreamble + "src is directory, so copying contents");
703          }
704          store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
705    
706          List<String> keysToDelete = new ArrayList<String>();
707          String priorLastKey = null;
708          do {
709            PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
710            for (FileMetadata file : listing.getFiles()) {
711              keysToDelete.add(file.getKey());
712              store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
713            }
714            priorLastKey = listing.getPriorLastKey();
715          } while (priorLastKey != null);
716    
717          if(LOG.isDebugEnabled()) {
718            LOG.debug(debugPreamble +
719                "all files in src copied, now removing src files");
720          }
721          for (String key: keysToDelete) {
722            store.delete(key);
723          }
724    
725          try {
726            store.delete(srcKey + FOLDER_SUFFIX);
727          } catch (FileNotFoundException e) {
728            //this is fine, we don't require a marker
729          }
730          if(LOG.isDebugEnabled()) {
731            LOG.debug(debugPreamble + "done");
732          }
733        }
734    
735        return true;
736      }
737      
738      @Override
739      public long getDefaultBlockSize() {
740        return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
741      }
742    
743      /**
744       * Set the working directory to the given directory.
745       */
746      @Override
747      public void setWorkingDirectory(Path newDir) {
748        workingDir = newDir;
749      }
750      
751      @Override
752      public Path getWorkingDirectory() {
753        return workingDir;
754      }
755    
756      @Override
757      public String getCanonicalServiceName() {
758        // Does not support Token
759        return null;
760      }
761    }