001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3native; 020 021import java.io.BufferedOutputStream; 022import java.io.EOFException; 023import java.io.File; 024import java.io.FileNotFoundException; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.net.URI; 030import java.security.DigestOutputStream; 031import java.security.MessageDigest; 032import java.security.NoSuchAlgorithmException; 033import java.util.ArrayList; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.TimeUnit; 040 041import com.google.common.base.Preconditions; 042import org.apache.hadoop.classification.InterfaceAudience; 043import org.apache.hadoop.classification.InterfaceStability; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.BufferedFSInputStream; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FSExceptionMessages; 049import org.apache.hadoop.fs.FSInputStream; 050import org.apache.hadoop.fs.FileAlreadyExistsException; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.LocalDirAllocator; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.permission.FsPermission; 056import org.apache.hadoop.fs.s3.S3Exception; 057import org.apache.hadoop.io.IOUtils; 058import org.apache.hadoop.io.retry.RetryPolicies; 059import org.apache.hadoop.io.retry.RetryPolicy; 060import org.apache.hadoop.io.retry.RetryProxy; 061import org.apache.hadoop.util.Progressable; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * <p> 067 * A {@link FileSystem} for reading and writing files stored on 068 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 069 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 070 * stores files on S3 in their 071 * native form so they can be read by other S3 tools. 072 * 073 * A note about directories. S3 of course has no "native" support for them. 074 * The idiom we choose then is: for any directory created by this class, 075 * we use an empty object "#{dirpath}_$folder$" as a marker. 076 * Further, to interoperate with other S3 tools, we also accept the following: 077 * - an object "#{dirpath}/' denoting a directory marker 078 * - if there exists any objects with the prefix "#{dirpath}/", then the 079 * directory is said to exist 080 * - if both a file with the name of a directory and a marker for that 081 * directory exists, then the *file masks the directory*, and the directory 082 * is never returned. 083 * </p> 084 * @see org.apache.hadoop.fs.s3.S3FileSystem 085 */ 086@InterfaceAudience.Public 087@InterfaceStability.Stable 088public class NativeS3FileSystem extends FileSystem { 089 090 public static final Logger LOG = 091 LoggerFactory.getLogger(NativeS3FileSystem.class); 092 093 private static final String FOLDER_SUFFIX = "_$folder$"; 094 static final String PATH_DELIMITER = Path.SEPARATOR; 095 private static final int S3_MAX_LISTING_LENGTH = 1000; 096 097 static class NativeS3FsInputStream extends FSInputStream { 098 099 private NativeFileSystemStore store; 100 private Statistics statistics; 101 private InputStream in; 102 private final String key; 103 private long pos = 0; 104 105 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 106 Preconditions.checkNotNull(in, "Null input stream"); 107 this.store = store; 108 this.statistics = statistics; 109 this.in = in; 110 this.key = key; 111 } 112 113 @Override 114 public synchronized int read() throws IOException { 115 int result; 116 try { 117 result = in.read(); 118 } catch (IOException e) { 119 LOG.info("Received IOException while reading '{}', attempting to reopen", 120 key); 121 LOG.debug("{}", e, e); 122 try { 123 reopen(pos); 124 result = in.read(); 125 } catch (EOFException eof) { 126 LOG.debug("EOF on input stream read: {}", eof, eof); 127 result = -1; 128 } 129 } 130 if (result != -1) { 131 pos++; 132 } 133 if (statistics != null && result != -1) { 134 statistics.incrementBytesRead(1); 135 } 136 return result; 137 } 138 @Override 139 public synchronized int read(byte[] b, int off, int len) 140 throws IOException { 141 if (in == null) { 142 throw new EOFException("Cannot read closed stream"); 143 } 144 int result = -1; 145 try { 146 result = in.read(b, off, len); 147 } catch (EOFException eof) { 148 throw eof; 149 } catch (IOException e) { 150 LOG.info( "Received IOException while reading '{}'," + 151 " attempting to reopen.", key); 152 reopen(pos); 153 result = in.read(b, off, len); 154 } 155 if (result > 0) { 156 pos += result; 157 } 158 if (statistics != null && result > 0) { 159 statistics.incrementBytesRead(result); 160 } 161 return result; 162 } 163 164 @Override 165 public synchronized void close() throws IOException { 166 closeInnerStream(); 167 } 168 169 /** 170 * Close the inner stream if not null. Even if an exception 171 * is raised during the close, the field is set to null 172 */ 173 private void closeInnerStream() { 174 IOUtils.closeStream(in); 175 in = null; 176 } 177 178 /** 179 * Reopen a new input stream with the specified position 180 * @param pos the position to reopen a new stream 181 * @throws IOException 182 */ 183 private synchronized void reopen(long pos) throws IOException { 184 LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); 185 InputStream newStream = store.retrieve(key, pos); 186 updateInnerStream(newStream, pos); 187 } 188 189 /** 190 * Update inner stream with a new stream and position 191 * @param newStream new stream -must not be null 192 * @param newpos new position 193 * @throws IOException IO exception on a failure to close the existing 194 * stream. 195 */ 196 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { 197 Preconditions.checkNotNull(newStream, "Null newstream argument"); 198 closeInnerStream(); 199 in = newStream; 200 this.pos = newpos; 201 } 202 203 @Override 204 public synchronized void seek(long newpos) throws IOException { 205 if (newpos < 0) { 206 throw new EOFException( 207 FSExceptionMessages.NEGATIVE_SEEK); 208 } 209 if (pos != newpos) { 210 // the seek is attempting to move the current position 211 reopen(newpos); 212 } 213 } 214 215 @Override 216 public synchronized long getPos() throws IOException { 217 return pos; 218 } 219 @Override 220 public boolean seekToNewSource(long targetPos) throws IOException { 221 return false; 222 } 223 } 224 225 private class NativeS3FsOutputStream extends OutputStream { 226 227 private Configuration conf; 228 private String key; 229 private File backupFile; 230 private OutputStream backupStream; 231 private MessageDigest digest; 232 private boolean closed; 233 private LocalDirAllocator lDirAlloc; 234 235 public NativeS3FsOutputStream(Configuration conf, 236 NativeFileSystemStore store, String key, Progressable progress, 237 int bufferSize) throws IOException { 238 this.conf = conf; 239 this.key = key; 240 this.backupFile = newBackupFile(); 241 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 242 try { 243 this.digest = MessageDigest.getInstance("MD5"); 244 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 245 new FileOutputStream(backupFile), this.digest)); 246 } catch (NoSuchAlgorithmException e) { 247 LOG.warn("Cannot load MD5 digest algorithm," + 248 "skipping message integrity check.", e); 249 this.backupStream = new BufferedOutputStream( 250 new FileOutputStream(backupFile)); 251 } 252 } 253 254 private File newBackupFile() throws IOException { 255 if (lDirAlloc == null) { 256 lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir"); 257 } 258 File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); 259 result.deleteOnExit(); 260 return result; 261 } 262 263 @Override 264 public void flush() throws IOException { 265 backupStream.flush(); 266 } 267 268 @Override 269 public synchronized void close() throws IOException { 270 if (closed) { 271 return; 272 } 273 274 backupStream.close(); 275 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); 276 277 try { 278 byte[] md5Hash = digest == null ? null : digest.digest(); 279 store.storeFile(key, backupFile, md5Hash); 280 } finally { 281 if (!backupFile.delete()) { 282 LOG.warn("Could not delete temporary s3n file: " + backupFile); 283 } 284 super.close(); 285 closed = true; 286 } 287 LOG.info("OutputStream for key '{}' upload complete", key); 288 } 289 290 @Override 291 public void write(int b) throws IOException { 292 backupStream.write(b); 293 } 294 295 @Override 296 public void write(byte[] b, int off, int len) throws IOException { 297 backupStream.write(b, off, len); 298 } 299 } 300 301 private URI uri; 302 private NativeFileSystemStore store; 303 private Path workingDir; 304 305 public NativeS3FileSystem() { 306 // set store in initialize() 307 } 308 309 public NativeS3FileSystem(NativeFileSystemStore store) { 310 this.store = store; 311 } 312 313 /** 314 * Return the protocol scheme for the FileSystem. 315 * <p/> 316 * 317 * @return <code>s3n</code> 318 */ 319 @Override 320 public String getScheme() { 321 return "s3n"; 322 } 323 324 @Override 325 public void initialize(URI uri, Configuration conf) throws IOException { 326 super.initialize(uri, conf); 327 if (store == null) { 328 store = createDefaultStore(conf); 329 } 330 store.initialize(uri, conf); 331 setConf(conf); 332 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 333 this.workingDir = 334 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 335 } 336 337 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 338 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 339 340 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 341 conf.getInt("fs.s3.maxRetries", 4), 342 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 343 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 344 new HashMap<Class<? extends Exception>, RetryPolicy>(); 345 exceptionToPolicyMap.put(IOException.class, basePolicy); 346 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 347 348 RetryPolicy methodPolicy = RetryPolicies.retryByException( 349 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 350 Map<String, RetryPolicy> methodNameToPolicyMap = 351 new HashMap<String, RetryPolicy>(); 352 methodNameToPolicyMap.put("storeFile", methodPolicy); 353 methodNameToPolicyMap.put("rename", methodPolicy); 354 355 return (NativeFileSystemStore) 356 RetryProxy.create(NativeFileSystemStore.class, store, 357 methodNameToPolicyMap); 358 } 359 360 private static String pathToKey(Path path) { 361 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 362 // allow uris without trailing slash after bucket to refer to root, 363 // like s3n://mybucket 364 return ""; 365 } 366 if (!path.isAbsolute()) { 367 throw new IllegalArgumentException("Path must be absolute: " + path); 368 } 369 String ret = path.toUri().getPath().substring(1); // remove initial slash 370 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 371 ret = ret.substring(0, ret.length() -1); 372 } 373 return ret; 374 } 375 376 private static Path keyToPath(String key) { 377 return new Path("/" + key); 378 } 379 380 private Path makeAbsolute(Path path) { 381 if (path.isAbsolute()) { 382 return path; 383 } 384 return new Path(workingDir, path); 385 } 386 387 /** This optional operation is not yet supported. */ 388 @Override 389 public FSDataOutputStream append(Path f, int bufferSize, 390 Progressable progress) throws IOException { 391 throw new IOException("Not supported"); 392 } 393 394 @Override 395 public FSDataOutputStream create(Path f, FsPermission permission, 396 boolean overwrite, int bufferSize, short replication, long blockSize, 397 Progressable progress) throws IOException { 398 399 if (exists(f) && !overwrite) { 400 throw new FileAlreadyExistsException("File already exists: " + f); 401 } 402 403 if(LOG.isDebugEnabled()) { 404 LOG.debug("Creating new file '" + f + "' in S3"); 405 } 406 Path absolutePath = makeAbsolute(f); 407 String key = pathToKey(absolutePath); 408 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 409 key, progress, bufferSize), statistics); 410 } 411 412 @Override 413 public boolean delete(Path f, boolean recurse) throws IOException { 414 FileStatus status; 415 try { 416 status = getFileStatus(f); 417 } catch (FileNotFoundException e) { 418 if(LOG.isDebugEnabled()) { 419 LOG.debug("Delete called for '" + f + 420 "' but file does not exist, so returning false"); 421 } 422 return false; 423 } 424 Path absolutePath = makeAbsolute(f); 425 String key = pathToKey(absolutePath); 426 if (status.isDirectory()) { 427 if (!recurse && listStatus(f).length > 0) { 428 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); 429 } 430 431 createParent(f); 432 433 if(LOG.isDebugEnabled()) { 434 LOG.debug("Deleting directory '" + f + "'"); 435 } 436 String priorLastKey = null; 437 do { 438 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 439 for (FileMetadata file : listing.getFiles()) { 440 store.delete(file.getKey()); 441 } 442 priorLastKey = listing.getPriorLastKey(); 443 } while (priorLastKey != null); 444 445 try { 446 store.delete(key + FOLDER_SUFFIX); 447 } catch (FileNotFoundException e) { 448 //this is fine, we don't require a marker 449 } 450 } else { 451 if(LOG.isDebugEnabled()) { 452 LOG.debug("Deleting file '" + f + "'"); 453 } 454 createParent(f); 455 store.delete(key); 456 } 457 return true; 458 } 459 460 @Override 461 public FileStatus getFileStatus(Path f) throws IOException { 462 Path absolutePath = makeAbsolute(f); 463 String key = pathToKey(absolutePath); 464 465 if (key.length() == 0) { // root always exists 466 return newDirectory(absolutePath); 467 } 468 469 if(LOG.isDebugEnabled()) { 470 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 471 } 472 FileMetadata meta = store.retrieveMetadata(key); 473 if (meta != null) { 474 if(LOG.isDebugEnabled()) { 475 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 476 } 477 return newFile(meta, absolutePath); 478 } 479 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 480 if(LOG.isDebugEnabled()) { 481 LOG.debug("getFileStatus returning 'directory' for key '" + key + 482 "' as '" + key + FOLDER_SUFFIX + "' exists"); 483 } 484 return newDirectory(absolutePath); 485 } 486 487 if(LOG.isDebugEnabled()) { 488 LOG.debug("getFileStatus listing key '" + key + "'"); 489 } 490 PartialListing listing = store.list(key, 1); 491 if (listing.getFiles().length > 0 || 492 listing.getCommonPrefixes().length > 0) { 493 if(LOG.isDebugEnabled()) { 494 LOG.debug("getFileStatus returning 'directory' for key '" + key + 495 "' as it has contents"); 496 } 497 return newDirectory(absolutePath); 498 } 499 500 if(LOG.isDebugEnabled()) { 501 LOG.debug("getFileStatus could not find key '" + key + "'"); 502 } 503 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 504 } 505 506 @Override 507 public URI getUri() { 508 return uri; 509 } 510 511 /** 512 * <p> 513 * If <code>f</code> is a file, this method will make a single call to S3. 514 * If <code>f</code> is a directory, this method will make a maximum of 515 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 516 * files and directories contained directly in <code>f</code>. 517 * </p> 518 */ 519 @Override 520 public FileStatus[] listStatus(Path f) throws IOException { 521 522 Path absolutePath = makeAbsolute(f); 523 String key = pathToKey(absolutePath); 524 525 if (key.length() > 0) { 526 FileMetadata meta = store.retrieveMetadata(key); 527 if (meta != null) { 528 return new FileStatus[] { newFile(meta, absolutePath) }; 529 } 530 } 531 532 URI pathUri = absolutePath.toUri(); 533 Set<FileStatus> status = new TreeSet<FileStatus>(); 534 String priorLastKey = null; 535 do { 536 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 537 for (FileMetadata fileMetadata : listing.getFiles()) { 538 Path subpath = keyToPath(fileMetadata.getKey()); 539 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 540 541 if (fileMetadata.getKey().equals(key + "/")) { 542 // this is just the directory we have been asked to list 543 } 544 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 545 status.add(newDirectory(new Path( 546 absolutePath, 547 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 548 } 549 else { 550 status.add(newFile(fileMetadata, subpath)); 551 } 552 } 553 for (String commonPrefix : listing.getCommonPrefixes()) { 554 Path subpath = keyToPath(commonPrefix); 555 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 556 status.add(newDirectory(new Path(absolutePath, relativePath))); 557 } 558 priorLastKey = listing.getPriorLastKey(); 559 } while (priorLastKey != null); 560 561 if (status.isEmpty() && 562 key.length() > 0 && 563 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 564 throw new FileNotFoundException("File " + f + " does not exist."); 565 } 566 567 return status.toArray(new FileStatus[status.size()]); 568 } 569 570 private FileStatus newFile(FileMetadata meta, Path path) { 571 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 572 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 573 } 574 575 private FileStatus newDirectory(Path path) { 576 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 577 } 578 579 @Override 580 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 581 Path absolutePath = makeAbsolute(f); 582 List<Path> paths = new ArrayList<Path>(); 583 do { 584 paths.add(0, absolutePath); 585 absolutePath = absolutePath.getParent(); 586 } while (absolutePath != null); 587 588 boolean result = true; 589 for (Path path : paths) { 590 result &= mkdir(path); 591 } 592 return result; 593 } 594 595 private boolean mkdir(Path f) throws IOException { 596 try { 597 FileStatus fileStatus = getFileStatus(f); 598 if (fileStatus.isFile()) { 599 throw new FileAlreadyExistsException(String.format( 600 "Can't make directory for path '%s' since it is a file.", f)); 601 602 } 603 } catch (FileNotFoundException e) { 604 if(LOG.isDebugEnabled()) { 605 LOG.debug("Making dir '" + f + "' in S3"); 606 } 607 String key = pathToKey(f) + FOLDER_SUFFIX; 608 store.storeEmptyFile(key); 609 } 610 return true; 611 } 612 613 @Override 614 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 615 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 616 if (fs.isDirectory()) { 617 throw new FileNotFoundException("'" + f + "' is a directory"); 618 } 619 LOG.info("Opening '" + f + "' for reading"); 620 Path absolutePath = makeAbsolute(f); 621 String key = pathToKey(absolutePath); 622 return new FSDataInputStream(new BufferedFSInputStream( 623 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 624 } 625 626 // rename() and delete() use this method to ensure that the parent directory 627 // of the source does not vanish. 628 private void createParent(Path path) throws IOException { 629 Path parent = path.getParent(); 630 if (parent != null) { 631 String key = pathToKey(makeAbsolute(parent)); 632 if (key.length() > 0) { 633 store.storeEmptyFile(key + FOLDER_SUFFIX); 634 } 635 } 636 } 637 638 639 @Override 640 public boolean rename(Path src, Path dst) throws IOException { 641 642 String srcKey = pathToKey(makeAbsolute(src)); 643 644 if (srcKey.length() == 0) { 645 // Cannot rename root of file system 646 return false; 647 } 648 649 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 650 651 // Figure out the final destination 652 String dstKey; 653 try { 654 boolean dstIsFile = getFileStatus(dst).isFile(); 655 if (dstIsFile) { 656 if(LOG.isDebugEnabled()) { 657 LOG.debug(debugPreamble + 658 "returning false as dst is an already existing file"); 659 } 660 return false; 661 } else { 662 if(LOG.isDebugEnabled()) { 663 LOG.debug(debugPreamble + "using dst as output directory"); 664 } 665 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 666 } 667 } catch (FileNotFoundException e) { 668 if(LOG.isDebugEnabled()) { 669 LOG.debug(debugPreamble + "using dst as output destination"); 670 } 671 dstKey = pathToKey(makeAbsolute(dst)); 672 try { 673 if (getFileStatus(dst.getParent()).isFile()) { 674 if(LOG.isDebugEnabled()) { 675 LOG.debug(debugPreamble + 676 "returning false as dst parent exists and is a file"); 677 } 678 return false; 679 } 680 } catch (FileNotFoundException ex) { 681 if(LOG.isDebugEnabled()) { 682 LOG.debug(debugPreamble + 683 "returning false as dst parent does not exist"); 684 } 685 return false; 686 } 687 } 688 689 boolean srcIsFile; 690 try { 691 srcIsFile = getFileStatus(src).isFile(); 692 } catch (FileNotFoundException e) { 693 if(LOG.isDebugEnabled()) { 694 LOG.debug(debugPreamble + "returning false as src does not exist"); 695 } 696 return false; 697 } 698 if (srcIsFile) { 699 if(LOG.isDebugEnabled()) { 700 LOG.debug(debugPreamble + 701 "src is file, so doing copy then delete in S3"); 702 } 703 store.copy(srcKey, dstKey); 704 store.delete(srcKey); 705 } else { 706 if(LOG.isDebugEnabled()) { 707 LOG.debug(debugPreamble + "src is directory, so copying contents"); 708 } 709 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 710 711 List<String> keysToDelete = new ArrayList<String>(); 712 String priorLastKey = null; 713 do { 714 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 715 for (FileMetadata file : listing.getFiles()) { 716 keysToDelete.add(file.getKey()); 717 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 718 } 719 priorLastKey = listing.getPriorLastKey(); 720 } while (priorLastKey != null); 721 722 if(LOG.isDebugEnabled()) { 723 LOG.debug(debugPreamble + 724 "all files in src copied, now removing src files"); 725 } 726 for (String key: keysToDelete) { 727 store.delete(key); 728 } 729 730 try { 731 store.delete(srcKey + FOLDER_SUFFIX); 732 } catch (FileNotFoundException e) { 733 //this is fine, we don't require a marker 734 } 735 if(LOG.isDebugEnabled()) { 736 LOG.debug(debugPreamble + "done"); 737 } 738 } 739 740 return true; 741 } 742 743 @Override 744 public long getDefaultBlockSize() { 745 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 746 } 747 748 /** 749 * Set the working directory to the given directory. 750 */ 751 @Override 752 public void setWorkingDirectory(Path newDir) { 753 workingDir = newDir; 754 } 755 756 @Override 757 public Path getWorkingDirectory() { 758 return workingDir; 759 } 760 761 @Override 762 public String getCanonicalServiceName() { 763 // Does not support Token 764 return null; 765 } 766}