001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3native;
020
021 import java.io.BufferedOutputStream;
022 import java.io.EOFException;
023 import java.io.File;
024 import java.io.FileNotFoundException;
025 import java.io.FileOutputStream;
026 import java.io.IOException;
027 import java.io.InputStream;
028 import java.io.OutputStream;
029 import java.net.URI;
030 import java.security.DigestOutputStream;
031 import java.security.MessageDigest;
032 import java.security.NoSuchAlgorithmException;
033 import java.util.ArrayList;
034 import java.util.HashMap;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Set;
038 import java.util.TreeSet;
039 import java.util.concurrent.TimeUnit;
040
041 import com.google.common.base.Preconditions;
042 import org.apache.hadoop.classification.InterfaceAudience;
043 import org.apache.hadoop.classification.InterfaceStability;
044 import org.apache.hadoop.conf.Configuration;
045 import org.apache.hadoop.fs.BufferedFSInputStream;
046 import org.apache.hadoop.fs.FSDataInputStream;
047 import org.apache.hadoop.fs.FSDataOutputStream;
048 import org.apache.hadoop.fs.FSExceptionMessages;
049 import org.apache.hadoop.fs.FSInputStream;
050 import org.apache.hadoop.fs.FileAlreadyExistsException;
051 import org.apache.hadoop.fs.FileStatus;
052 import org.apache.hadoop.fs.FileSystem;
053 import org.apache.hadoop.fs.Path;
054 import org.apache.hadoop.fs.permission.FsPermission;
055 import org.apache.hadoop.fs.s3.S3Exception;
056 import org.apache.hadoop.io.retry.RetryPolicies;
057 import org.apache.hadoop.io.retry.RetryPolicy;
058 import org.apache.hadoop.io.retry.RetryProxy;
059 import org.apache.hadoop.util.Progressable;
060 import org.slf4j.Logger;
061 import org.slf4j.LoggerFactory;
062
063 /**
064 * <p>
065 * A {@link FileSystem} for reading and writing files stored on
066 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
067 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
068 * stores files on S3 in their
069 * native form so they can be read by other S3 tools.
070 *
071 * A note about directories. S3 of course has no "native" support for them.
072 * The idiom we choose then is: for any directory created by this class,
073 * we use an empty object "#{dirpath}_$folder$" as a marker.
074 * Further, to interoperate with other S3 tools, we also accept the following:
075 * - an object "#{dirpath}/' denoting a directory marker
076 * - if there exists any objects with the prefix "#{dirpath}/", then the
077 * directory is said to exist
078 * - if both a file with the name of a directory and a marker for that
079 * directory exists, then the *file masks the directory*, and the directory
080 * is never returned.
081 * </p>
082 * @see org.apache.hadoop.fs.s3.S3FileSystem
083 */
084 @InterfaceAudience.Public
085 @InterfaceStability.Stable
086 public class NativeS3FileSystem extends FileSystem {
087
088 public static final Logger LOG =
089 LoggerFactory.getLogger(NativeS3FileSystem.class);
090
091 private static final String FOLDER_SUFFIX = "_$folder$";
092 static final String PATH_DELIMITER = Path.SEPARATOR;
093 private static final int S3_MAX_LISTING_LENGTH = 1000;
094
095 static class NativeS3FsInputStream extends FSInputStream {
096
097 private NativeFileSystemStore store;
098 private Statistics statistics;
099 private InputStream in;
100 private final String key;
101 private long pos = 0;
102
103 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
104 Preconditions.checkNotNull(in, "Null input stream");
105 this.store = store;
106 this.statistics = statistics;
107 this.in = in;
108 this.key = key;
109 }
110
111 @Override
112 public synchronized int read() throws IOException {
113 int result;
114 try {
115 result = in.read();
116 } catch (IOException e) {
117 LOG.info("Received IOException while reading '{}', attempting to reopen",
118 key);
119 LOG.debug("{}", e, e);
120 try {
121 seek(pos);
122 result = in.read();
123 } catch (EOFException eof) {
124 LOG.debug("EOF on input stream read: {}", eof, eof);
125 result = -1;
126 }
127 }
128 if (result != -1) {
129 pos++;
130 }
131 if (statistics != null && result != -1) {
132 statistics.incrementBytesRead(1);
133 }
134 return result;
135 }
136 @Override
137 public synchronized int read(byte[] b, int off, int len)
138 throws IOException {
139 if (in == null) {
140 throw new EOFException("Cannot read closed stream");
141 }
142 int result = -1;
143 try {
144 result = in.read(b, off, len);
145 } catch (EOFException eof) {
146 throw eof;
147 } catch (IOException e) {
148 LOG.info( "Received IOException while reading '{}'," +
149 " attempting to reopen.", key);
150 seek(pos);
151 result = in.read(b, off, len);
152 }
153 if (result > 0) {
154 pos += result;
155 }
156 if (statistics != null && result > 0) {
157 statistics.incrementBytesRead(result);
158 }
159 return result;
160 }
161
162 @Override
163 public synchronized void close() throws IOException {
164 closeInnerStream();
165 }
166
167 /**
168 * Close the inner stream if not null. Even if an exception
169 * is raised during the close, the field is set to null
170 * @throws IOException if raised by the close() operation.
171 */
172 private void closeInnerStream() throws IOException {
173 if (in != null) {
174 try {
175 in.close();
176 } finally {
177 in = null;
178 }
179 }
180 }
181
182 /**
183 * Update inner stream with a new stream and position
184 * @param newStream new stream -must not be null
185 * @param newpos new position
186 * @throws IOException IO exception on a failure to close the existing
187 * stream.
188 */
189 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException {
190 Preconditions.checkNotNull(newStream, "Null newstream argument");
191 closeInnerStream();
192 in = newStream;
193 this.pos = newpos;
194 }
195
196 @Override
197 public synchronized void seek(long newpos) throws IOException {
198 if (newpos < 0) {
199 throw new EOFException(
200 FSExceptionMessages.NEGATIVE_SEEK);
201 }
202 if (pos != newpos) {
203 // the seek is attempting to move the current position
204 LOG.debug("Opening key '{}' for reading at position '{}", key, newpos);
205 InputStream newStream = store.retrieve(key, newpos);
206 updateInnerStream(newStream, newpos);
207 }
208 }
209
210 @Override
211 public synchronized long getPos() throws IOException {
212 return pos;
213 }
214 @Override
215 public boolean seekToNewSource(long targetPos) throws IOException {
216 return false;
217 }
218 }
219
220 private class NativeS3FsOutputStream extends OutputStream {
221
222 private Configuration conf;
223 private String key;
224 private File backupFile;
225 private OutputStream backupStream;
226 private MessageDigest digest;
227 private boolean closed;
228
229 public NativeS3FsOutputStream(Configuration conf,
230 NativeFileSystemStore store, String key, Progressable progress,
231 int bufferSize) throws IOException {
232 this.conf = conf;
233 this.key = key;
234 this.backupFile = newBackupFile();
235 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
236 try {
237 this.digest = MessageDigest.getInstance("MD5");
238 this.backupStream = new BufferedOutputStream(new DigestOutputStream(
239 new FileOutputStream(backupFile), this.digest));
240 } catch (NoSuchAlgorithmException e) {
241 LOG.warn("Cannot load MD5 digest algorithm," +
242 "skipping message integrity check.", e);
243 this.backupStream = new BufferedOutputStream(
244 new FileOutputStream(backupFile));
245 }
246 }
247
248 private File newBackupFile() throws IOException {
249 File dir = new File(conf.get("fs.s3.buffer.dir"));
250 if (!dir.mkdirs() && !dir.exists()) {
251 throw new IOException("Cannot create S3 buffer directory: " + dir);
252 }
253 File result = File.createTempFile("output-", ".tmp", dir);
254 result.deleteOnExit();
255 return result;
256 }
257
258 @Override
259 public void flush() throws IOException {
260 backupStream.flush();
261 }
262
263 @Override
264 public synchronized void close() throws IOException {
265 if (closed) {
266 return;
267 }
268
269 backupStream.close();
270 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key);
271
272 try {
273 byte[] md5Hash = digest == null ? null : digest.digest();
274 store.storeFile(key, backupFile, md5Hash);
275 } finally {
276 if (!backupFile.delete()) {
277 LOG.warn("Could not delete temporary s3n file: " + backupFile);
278 }
279 super.close();
280 closed = true;
281 }
282 LOG.info("OutputStream for key '{}' upload complete", key);
283 }
284
285 @Override
286 public void write(int b) throws IOException {
287 backupStream.write(b);
288 }
289
290 @Override
291 public void write(byte[] b, int off, int len) throws IOException {
292 backupStream.write(b, off, len);
293 }
294 }
295
296 private URI uri;
297 private NativeFileSystemStore store;
298 private Path workingDir;
299
300 public NativeS3FileSystem() {
301 // set store in initialize()
302 }
303
304 public NativeS3FileSystem(NativeFileSystemStore store) {
305 this.store = store;
306 }
307
308 /**
309 * Return the protocol scheme for the FileSystem.
310 * <p/>
311 *
312 * @return <code>s3n</code>
313 */
314 @Override
315 public String getScheme() {
316 return "s3n";
317 }
318
319 @Override
320 public void initialize(URI uri, Configuration conf) throws IOException {
321 super.initialize(uri, conf);
322 if (store == null) {
323 store = createDefaultStore(conf);
324 }
325 store.initialize(uri, conf);
326 setConf(conf);
327 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
328 this.workingDir =
329 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
330 }
331
332 private static NativeFileSystemStore createDefaultStore(Configuration conf) {
333 NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
334
335 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
336 conf.getInt("fs.s3.maxRetries", 4),
337 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
338 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
339 new HashMap<Class<? extends Exception>, RetryPolicy>();
340 exceptionToPolicyMap.put(IOException.class, basePolicy);
341 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
342
343 RetryPolicy methodPolicy = RetryPolicies.retryByException(
344 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
345 Map<String, RetryPolicy> methodNameToPolicyMap =
346 new HashMap<String, RetryPolicy>();
347 methodNameToPolicyMap.put("storeFile", methodPolicy);
348 methodNameToPolicyMap.put("rename", methodPolicy);
349
350 return (NativeFileSystemStore)
351 RetryProxy.create(NativeFileSystemStore.class, store,
352 methodNameToPolicyMap);
353 }
354
355 private static String pathToKey(Path path) {
356 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
357 // allow uris without trailing slash after bucket to refer to root,
358 // like s3n://mybucket
359 return "";
360 }
361 if (!path.isAbsolute()) {
362 throw new IllegalArgumentException("Path must be absolute: " + path);
363 }
364 String ret = path.toUri().getPath().substring(1); // remove initial slash
365 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
366 ret = ret.substring(0, ret.length() -1);
367 }
368 return ret;
369 }
370
371 private static Path keyToPath(String key) {
372 return new Path("/" + key);
373 }
374
375 private Path makeAbsolute(Path path) {
376 if (path.isAbsolute()) {
377 return path;
378 }
379 return new Path(workingDir, path);
380 }
381
382 /** This optional operation is not yet supported. */
383 @Override
384 public FSDataOutputStream append(Path f, int bufferSize,
385 Progressable progress) throws IOException {
386 throw new IOException("Not supported");
387 }
388
389 @Override
390 public FSDataOutputStream create(Path f, FsPermission permission,
391 boolean overwrite, int bufferSize, short replication, long blockSize,
392 Progressable progress) throws IOException {
393
394 if (exists(f) && !overwrite) {
395 throw new FileAlreadyExistsException("File already exists: " + f);
396 }
397
398 if(LOG.isDebugEnabled()) {
399 LOG.debug("Creating new file '" + f + "' in S3");
400 }
401 Path absolutePath = makeAbsolute(f);
402 String key = pathToKey(absolutePath);
403 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
404 key, progress, bufferSize), statistics);
405 }
406
407 @Override
408 public boolean delete(Path f, boolean recurse) throws IOException {
409 FileStatus status;
410 try {
411 status = getFileStatus(f);
412 } catch (FileNotFoundException e) {
413 if(LOG.isDebugEnabled()) {
414 LOG.debug("Delete called for '" + f +
415 "' but file does not exist, so returning false");
416 }
417 return false;
418 }
419 Path absolutePath = makeAbsolute(f);
420 String key = pathToKey(absolutePath);
421 if (status.isDirectory()) {
422 if (!recurse && listStatus(f).length > 0) {
423 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false");
424 }
425
426 createParent(f);
427
428 if(LOG.isDebugEnabled()) {
429 LOG.debug("Deleting directory '" + f + "'");
430 }
431 String priorLastKey = null;
432 do {
433 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
434 for (FileMetadata file : listing.getFiles()) {
435 store.delete(file.getKey());
436 }
437 priorLastKey = listing.getPriorLastKey();
438 } while (priorLastKey != null);
439
440 try {
441 store.delete(key + FOLDER_SUFFIX);
442 } catch (FileNotFoundException e) {
443 //this is fine, we don't require a marker
444 }
445 } else {
446 if(LOG.isDebugEnabled()) {
447 LOG.debug("Deleting file '" + f + "'");
448 }
449 createParent(f);
450 store.delete(key);
451 }
452 return true;
453 }
454
455 @Override
456 public FileStatus getFileStatus(Path f) throws IOException {
457 Path absolutePath = makeAbsolute(f);
458 String key = pathToKey(absolutePath);
459
460 if (key.length() == 0) { // root always exists
461 return newDirectory(absolutePath);
462 }
463
464 if(LOG.isDebugEnabled()) {
465 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
466 }
467 FileMetadata meta = store.retrieveMetadata(key);
468 if (meta != null) {
469 if(LOG.isDebugEnabled()) {
470 LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
471 }
472 return newFile(meta, absolutePath);
473 }
474 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
475 if(LOG.isDebugEnabled()) {
476 LOG.debug("getFileStatus returning 'directory' for key '" + key +
477 "' as '" + key + FOLDER_SUFFIX + "' exists");
478 }
479 return newDirectory(absolutePath);
480 }
481
482 if(LOG.isDebugEnabled()) {
483 LOG.debug("getFileStatus listing key '" + key + "'");
484 }
485 PartialListing listing = store.list(key, 1);
486 if (listing.getFiles().length > 0 ||
487 listing.getCommonPrefixes().length > 0) {
488 if(LOG.isDebugEnabled()) {
489 LOG.debug("getFileStatus returning 'directory' for key '" + key +
490 "' as it has contents");
491 }
492 return newDirectory(absolutePath);
493 }
494
495 if(LOG.isDebugEnabled()) {
496 LOG.debug("getFileStatus could not find key '" + key + "'");
497 }
498 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
499 }
500
501 @Override
502 public URI getUri() {
503 return uri;
504 }
505
506 /**
507 * <p>
508 * If <code>f</code> is a file, this method will make a single call to S3.
509 * If <code>f</code> is a directory, this method will make a maximum of
510 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
511 * files and directories contained directly in <code>f</code>.
512 * </p>
513 */
514 @Override
515 public FileStatus[] listStatus(Path f) throws IOException {
516
517 Path absolutePath = makeAbsolute(f);
518 String key = pathToKey(absolutePath);
519
520 if (key.length() > 0) {
521 FileMetadata meta = store.retrieveMetadata(key);
522 if (meta != null) {
523 return new FileStatus[] { newFile(meta, absolutePath) };
524 }
525 }
526
527 URI pathUri = absolutePath.toUri();
528 Set<FileStatus> status = new TreeSet<FileStatus>();
529 String priorLastKey = null;
530 do {
531 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
532 for (FileMetadata fileMetadata : listing.getFiles()) {
533 Path subpath = keyToPath(fileMetadata.getKey());
534 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
535
536 if (fileMetadata.getKey().equals(key + "/")) {
537 // this is just the directory we have been asked to list
538 }
539 else if (relativePath.endsWith(FOLDER_SUFFIX)) {
540 status.add(newDirectory(new Path(
541 absolutePath,
542 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
543 }
544 else {
545 status.add(newFile(fileMetadata, subpath));
546 }
547 }
548 for (String commonPrefix : listing.getCommonPrefixes()) {
549 Path subpath = keyToPath(commonPrefix);
550 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
551 status.add(newDirectory(new Path(absolutePath, relativePath)));
552 }
553 priorLastKey = listing.getPriorLastKey();
554 } while (priorLastKey != null);
555
556 if (status.isEmpty() &&
557 key.length() > 0 &&
558 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
559 throw new FileNotFoundException("File " + f + " does not exist.");
560 }
561
562 return status.toArray(new FileStatus[status.size()]);
563 }
564
565 private FileStatus newFile(FileMetadata meta, Path path) {
566 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
567 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
568 }
569
570 private FileStatus newDirectory(Path path) {
571 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
572 }
573
574 @Override
575 public boolean mkdirs(Path f, FsPermission permission) throws IOException {
576 Path absolutePath = makeAbsolute(f);
577 List<Path> paths = new ArrayList<Path>();
578 do {
579 paths.add(0, absolutePath);
580 absolutePath = absolutePath.getParent();
581 } while (absolutePath != null);
582
583 boolean result = true;
584 for (Path path : paths) {
585 result &= mkdir(path);
586 }
587 return result;
588 }
589
590 private boolean mkdir(Path f) throws IOException {
591 try {
592 FileStatus fileStatus = getFileStatus(f);
593 if (fileStatus.isFile()) {
594 throw new FileAlreadyExistsException(String.format(
595 "Can't make directory for path '%s' since it is a file.", f));
596
597 }
598 } catch (FileNotFoundException e) {
599 if(LOG.isDebugEnabled()) {
600 LOG.debug("Making dir '" + f + "' in S3");
601 }
602 String key = pathToKey(f) + FOLDER_SUFFIX;
603 store.storeEmptyFile(key);
604 }
605 return true;
606 }
607
608 @Override
609 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
610 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
611 if (fs.isDirectory()) {
612 throw new FileNotFoundException("'" + f + "' is a directory");
613 }
614 LOG.info("Opening '" + f + "' for reading");
615 Path absolutePath = makeAbsolute(f);
616 String key = pathToKey(absolutePath);
617 return new FSDataInputStream(new BufferedFSInputStream(
618 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
619 }
620
621 // rename() and delete() use this method to ensure that the parent directory
622 // of the source does not vanish.
623 private void createParent(Path path) throws IOException {
624 Path parent = path.getParent();
625 if (parent != null) {
626 String key = pathToKey(makeAbsolute(parent));
627 if (key.length() > 0) {
628 store.storeEmptyFile(key + FOLDER_SUFFIX);
629 }
630 }
631 }
632
633
634 @Override
635 public boolean rename(Path src, Path dst) throws IOException {
636
637 String srcKey = pathToKey(makeAbsolute(src));
638
639 if (srcKey.length() == 0) {
640 // Cannot rename root of file system
641 return false;
642 }
643
644 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
645
646 // Figure out the final destination
647 String dstKey;
648 try {
649 boolean dstIsFile = getFileStatus(dst).isFile();
650 if (dstIsFile) {
651 if(LOG.isDebugEnabled()) {
652 LOG.debug(debugPreamble +
653 "returning false as dst is an already existing file");
654 }
655 return false;
656 } else {
657 if(LOG.isDebugEnabled()) {
658 LOG.debug(debugPreamble + "using dst as output directory");
659 }
660 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
661 }
662 } catch (FileNotFoundException e) {
663 if(LOG.isDebugEnabled()) {
664 LOG.debug(debugPreamble + "using dst as output destination");
665 }
666 dstKey = pathToKey(makeAbsolute(dst));
667 try {
668 if (getFileStatus(dst.getParent()).isFile()) {
669 if(LOG.isDebugEnabled()) {
670 LOG.debug(debugPreamble +
671 "returning false as dst parent exists and is a file");
672 }
673 return false;
674 }
675 } catch (FileNotFoundException ex) {
676 if(LOG.isDebugEnabled()) {
677 LOG.debug(debugPreamble +
678 "returning false as dst parent does not exist");
679 }
680 return false;
681 }
682 }
683
684 boolean srcIsFile;
685 try {
686 srcIsFile = getFileStatus(src).isFile();
687 } catch (FileNotFoundException e) {
688 if(LOG.isDebugEnabled()) {
689 LOG.debug(debugPreamble + "returning false as src does not exist");
690 }
691 return false;
692 }
693 if (srcIsFile) {
694 if(LOG.isDebugEnabled()) {
695 LOG.debug(debugPreamble +
696 "src is file, so doing copy then delete in S3");
697 }
698 store.copy(srcKey, dstKey);
699 store.delete(srcKey);
700 } else {
701 if(LOG.isDebugEnabled()) {
702 LOG.debug(debugPreamble + "src is directory, so copying contents");
703 }
704 store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
705
706 List<String> keysToDelete = new ArrayList<String>();
707 String priorLastKey = null;
708 do {
709 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
710 for (FileMetadata file : listing.getFiles()) {
711 keysToDelete.add(file.getKey());
712 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
713 }
714 priorLastKey = listing.getPriorLastKey();
715 } while (priorLastKey != null);
716
717 if(LOG.isDebugEnabled()) {
718 LOG.debug(debugPreamble +
719 "all files in src copied, now removing src files");
720 }
721 for (String key: keysToDelete) {
722 store.delete(key);
723 }
724
725 try {
726 store.delete(srcKey + FOLDER_SUFFIX);
727 } catch (FileNotFoundException e) {
728 //this is fine, we don't require a marker
729 }
730 if(LOG.isDebugEnabled()) {
731 LOG.debug(debugPreamble + "done");
732 }
733 }
734
735 return true;
736 }
737
738 @Override
739 public long getDefaultBlockSize() {
740 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
741 }
742
743 /**
744 * Set the working directory to the given directory.
745 */
746 @Override
747 public void setWorkingDirectory(Path newDir) {
748 workingDir = newDir;
749 }
750
751 @Override
752 public Path getWorkingDirectory() {
753 return workingDir;
754 }
755
756 @Override
757 public String getCanonicalServiceName() {
758 // Does not support Token
759 return null;
760 }
761 }