001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs.s3native; 020 021 import java.io.BufferedOutputStream; 022 import java.io.EOFException; 023 import java.io.File; 024 import java.io.FileNotFoundException; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 import java.net.URI; 030 import java.security.DigestOutputStream; 031 import java.security.MessageDigest; 032 import java.security.NoSuchAlgorithmException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.Set; 038 import java.util.TreeSet; 039 import java.util.concurrent.TimeUnit; 040 041 import com.google.common.base.Preconditions; 042 import org.apache.hadoop.classification.InterfaceAudience; 043 import org.apache.hadoop.classification.InterfaceStability; 044 import org.apache.hadoop.conf.Configuration; 045 import org.apache.hadoop.fs.BufferedFSInputStream; 046 import org.apache.hadoop.fs.FSDataInputStream; 047 import org.apache.hadoop.fs.FSDataOutputStream; 048 import org.apache.hadoop.fs.FSExceptionMessages; 049 import org.apache.hadoop.fs.FSInputStream; 050 import org.apache.hadoop.fs.FileAlreadyExistsException; 051 import org.apache.hadoop.fs.FileStatus; 052 import org.apache.hadoop.fs.FileSystem; 053 import org.apache.hadoop.fs.LocalDirAllocator; 054 import org.apache.hadoop.fs.Path; 055 import org.apache.hadoop.fs.permission.FsPermission; 056 import org.apache.hadoop.fs.s3.S3Exception; 057 import org.apache.hadoop.io.retry.RetryPolicies; 058 import org.apache.hadoop.io.retry.RetryPolicy; 059 import org.apache.hadoop.io.retry.RetryProxy; 060 import org.apache.hadoop.util.Progressable; 061 import org.slf4j.Logger; 062 import org.slf4j.LoggerFactory; 063 064 /** 065 * <p> 066 * A {@link FileSystem} for reading and writing files stored on 067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 069 * stores files on S3 in their 070 * native form so they can be read by other S3 tools. 071 * 072 * A note about directories. S3 of course has no "native" support for them. 073 * The idiom we choose then is: for any directory created by this class, 074 * we use an empty object "#{dirpath}_$folder$" as a marker. 075 * Further, to interoperate with other S3 tools, we also accept the following: 076 * - an object "#{dirpath}/' denoting a directory marker 077 * - if there exists any objects with the prefix "#{dirpath}/", then the 078 * directory is said to exist 079 * - if both a file with the name of a directory and a marker for that 080 * directory exists, then the *file masks the directory*, and the directory 081 * is never returned. 082 * </p> 083 * @see org.apache.hadoop.fs.s3.S3FileSystem 084 */ 085 @InterfaceAudience.Public 086 @InterfaceStability.Stable 087 public class NativeS3FileSystem extends FileSystem { 088 089 public static final Logger LOG = 090 LoggerFactory.getLogger(NativeS3FileSystem.class); 091 092 private static final String FOLDER_SUFFIX = "_$folder$"; 093 static final String PATH_DELIMITER = Path.SEPARATOR; 094 private static final int S3_MAX_LISTING_LENGTH = 1000; 095 096 static class NativeS3FsInputStream extends FSInputStream { 097 098 private NativeFileSystemStore store; 099 private Statistics statistics; 100 private InputStream in; 101 private final String key; 102 private long pos = 0; 103 104 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 105 Preconditions.checkNotNull(in, "Null input stream"); 106 this.store = store; 107 this.statistics = statistics; 108 this.in = in; 109 this.key = key; 110 } 111 112 @Override 113 public synchronized int read() throws IOException { 114 int result; 115 try { 116 result = in.read(); 117 } catch (IOException e) { 118 LOG.info("Received IOException while reading '{}', attempting to reopen", 119 key); 120 LOG.debug("{}", e, e); 121 try { 122 seek(pos); 123 result = in.read(); 124 } catch (EOFException eof) { 125 LOG.debug("EOF on input stream read: {}", eof, eof); 126 result = -1; 127 } 128 } 129 if (result != -1) { 130 pos++; 131 } 132 if (statistics != null && result != -1) { 133 statistics.incrementBytesRead(1); 134 } 135 return result; 136 } 137 @Override 138 public synchronized int read(byte[] b, int off, int len) 139 throws IOException { 140 if (in == null) { 141 throw new EOFException("Cannot read closed stream"); 142 } 143 int result = -1; 144 try { 145 result = in.read(b, off, len); 146 } catch (EOFException eof) { 147 throw eof; 148 } catch (IOException e) { 149 LOG.info( "Received IOException while reading '{}'," + 150 " attempting to reopen.", key); 151 seek(pos); 152 result = in.read(b, off, len); 153 } 154 if (result > 0) { 155 pos += result; 156 } 157 if (statistics != null && result > 0) { 158 statistics.incrementBytesRead(result); 159 } 160 return result; 161 } 162 163 @Override 164 public synchronized void close() throws IOException { 165 closeInnerStream(); 166 } 167 168 /** 169 * Close the inner stream if not null. Even if an exception 170 * is raised during the close, the field is set to null 171 * @throws IOException if raised by the close() operation. 172 */ 173 private void closeInnerStream() throws IOException { 174 if (in != null) { 175 try { 176 in.close(); 177 } finally { 178 in = null; 179 } 180 } 181 } 182 183 /** 184 * Update inner stream with a new stream and position 185 * @param newStream new stream -must not be null 186 * @param newpos new position 187 * @throws IOException IO exception on a failure to close the existing 188 * stream. 189 */ 190 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { 191 Preconditions.checkNotNull(newStream, "Null newstream argument"); 192 closeInnerStream(); 193 in = newStream; 194 this.pos = newpos; 195 } 196 197 @Override 198 public synchronized void seek(long newpos) throws IOException { 199 if (newpos < 0) { 200 throw new EOFException( 201 FSExceptionMessages.NEGATIVE_SEEK); 202 } 203 if (pos != newpos) { 204 // the seek is attempting to move the current position 205 LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); 206 InputStream newStream = store.retrieve(key, newpos); 207 updateInnerStream(newStream, newpos); 208 } 209 } 210 211 @Override 212 public synchronized long getPos() throws IOException { 213 return pos; 214 } 215 @Override 216 public boolean seekToNewSource(long targetPos) throws IOException { 217 return false; 218 } 219 } 220 221 private class NativeS3FsOutputStream extends OutputStream { 222 223 private Configuration conf; 224 private String key; 225 private File backupFile; 226 private OutputStream backupStream; 227 private MessageDigest digest; 228 private boolean closed; 229 private LocalDirAllocator lDirAlloc; 230 231 public NativeS3FsOutputStream(Configuration conf, 232 NativeFileSystemStore store, String key, Progressable progress, 233 int bufferSize) throws IOException { 234 this.conf = conf; 235 this.key = key; 236 this.backupFile = newBackupFile(); 237 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 238 try { 239 this.digest = MessageDigest.getInstance("MD5"); 240 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 241 new FileOutputStream(backupFile), this.digest)); 242 } catch (NoSuchAlgorithmException e) { 243 LOG.warn("Cannot load MD5 digest algorithm," + 244 "skipping message integrity check.", e); 245 this.backupStream = new BufferedOutputStream( 246 new FileOutputStream(backupFile)); 247 } 248 } 249 250 private File newBackupFile() throws IOException { 251 if (lDirAlloc == null) { 252 lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir"); 253 } 254 File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); 255 result.deleteOnExit(); 256 return result; 257 } 258 259 @Override 260 public void flush() throws IOException { 261 backupStream.flush(); 262 } 263 264 @Override 265 public synchronized void close() throws IOException { 266 if (closed) { 267 return; 268 } 269 270 backupStream.close(); 271 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); 272 273 try { 274 byte[] md5Hash = digest == null ? null : digest.digest(); 275 store.storeFile(key, backupFile, md5Hash); 276 } finally { 277 if (!backupFile.delete()) { 278 LOG.warn("Could not delete temporary s3n file: " + backupFile); 279 } 280 super.close(); 281 closed = true; 282 } 283 LOG.info("OutputStream for key '{}' upload complete", key); 284 } 285 286 @Override 287 public void write(int b) throws IOException { 288 backupStream.write(b); 289 } 290 291 @Override 292 public void write(byte[] b, int off, int len) throws IOException { 293 backupStream.write(b, off, len); 294 } 295 } 296 297 private URI uri; 298 private NativeFileSystemStore store; 299 private Path workingDir; 300 301 public NativeS3FileSystem() { 302 // set store in initialize() 303 } 304 305 public NativeS3FileSystem(NativeFileSystemStore store) { 306 this.store = store; 307 } 308 309 /** 310 * Return the protocol scheme for the FileSystem. 311 * <p/> 312 * 313 * @return <code>s3n</code> 314 */ 315 @Override 316 public String getScheme() { 317 return "s3n"; 318 } 319 320 @Override 321 public void initialize(URI uri, Configuration conf) throws IOException { 322 super.initialize(uri, conf); 323 if (store == null) { 324 store = createDefaultStore(conf); 325 } 326 store.initialize(uri, conf); 327 setConf(conf); 328 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 329 this.workingDir = 330 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 331 } 332 333 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 334 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 335 336 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 337 conf.getInt("fs.s3.maxRetries", 4), 338 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 339 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 340 new HashMap<Class<? extends Exception>, RetryPolicy>(); 341 exceptionToPolicyMap.put(IOException.class, basePolicy); 342 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 343 344 RetryPolicy methodPolicy = RetryPolicies.retryByException( 345 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 346 Map<String, RetryPolicy> methodNameToPolicyMap = 347 new HashMap<String, RetryPolicy>(); 348 methodNameToPolicyMap.put("storeFile", methodPolicy); 349 methodNameToPolicyMap.put("rename", methodPolicy); 350 351 return (NativeFileSystemStore) 352 RetryProxy.create(NativeFileSystemStore.class, store, 353 methodNameToPolicyMap); 354 } 355 356 private static String pathToKey(Path path) { 357 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 358 // allow uris without trailing slash after bucket to refer to root, 359 // like s3n://mybucket 360 return ""; 361 } 362 if (!path.isAbsolute()) { 363 throw new IllegalArgumentException("Path must be absolute: " + path); 364 } 365 String ret = path.toUri().getPath().substring(1); // remove initial slash 366 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 367 ret = ret.substring(0, ret.length() -1); 368 } 369 return ret; 370 } 371 372 private static Path keyToPath(String key) { 373 return new Path("/" + key); 374 } 375 376 private Path makeAbsolute(Path path) { 377 if (path.isAbsolute()) { 378 return path; 379 } 380 return new Path(workingDir, path); 381 } 382 383 /** This optional operation is not yet supported. */ 384 @Override 385 public FSDataOutputStream append(Path f, int bufferSize, 386 Progressable progress) throws IOException { 387 throw new IOException("Not supported"); 388 } 389 390 @Override 391 public FSDataOutputStream create(Path f, FsPermission permission, 392 boolean overwrite, int bufferSize, short replication, long blockSize, 393 Progressable progress) throws IOException { 394 395 if (exists(f) && !overwrite) { 396 throw new FileAlreadyExistsException("File already exists: " + f); 397 } 398 399 if(LOG.isDebugEnabled()) { 400 LOG.debug("Creating new file '" + f + "' in S3"); 401 } 402 Path absolutePath = makeAbsolute(f); 403 String key = pathToKey(absolutePath); 404 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 405 key, progress, bufferSize), statistics); 406 } 407 408 @Override 409 public boolean delete(Path f, boolean recurse) throws IOException { 410 FileStatus status; 411 try { 412 status = getFileStatus(f); 413 } catch (FileNotFoundException e) { 414 if(LOG.isDebugEnabled()) { 415 LOG.debug("Delete called for '" + f + 416 "' but file does not exist, so returning false"); 417 } 418 return false; 419 } 420 Path absolutePath = makeAbsolute(f); 421 String key = pathToKey(absolutePath); 422 if (status.isDirectory()) { 423 if (!recurse && listStatus(f).length > 0) { 424 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); 425 } 426 427 createParent(f); 428 429 if(LOG.isDebugEnabled()) { 430 LOG.debug("Deleting directory '" + f + "'"); 431 } 432 String priorLastKey = null; 433 do { 434 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 435 for (FileMetadata file : listing.getFiles()) { 436 store.delete(file.getKey()); 437 } 438 priorLastKey = listing.getPriorLastKey(); 439 } while (priorLastKey != null); 440 441 try { 442 store.delete(key + FOLDER_SUFFIX); 443 } catch (FileNotFoundException e) { 444 //this is fine, we don't require a marker 445 } 446 } else { 447 if(LOG.isDebugEnabled()) { 448 LOG.debug("Deleting file '" + f + "'"); 449 } 450 createParent(f); 451 store.delete(key); 452 } 453 return true; 454 } 455 456 @Override 457 public FileStatus getFileStatus(Path f) throws IOException { 458 Path absolutePath = makeAbsolute(f); 459 String key = pathToKey(absolutePath); 460 461 if (key.length() == 0) { // root always exists 462 return newDirectory(absolutePath); 463 } 464 465 if(LOG.isDebugEnabled()) { 466 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 467 } 468 FileMetadata meta = store.retrieveMetadata(key); 469 if (meta != null) { 470 if(LOG.isDebugEnabled()) { 471 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 472 } 473 return newFile(meta, absolutePath); 474 } 475 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 476 if(LOG.isDebugEnabled()) { 477 LOG.debug("getFileStatus returning 'directory' for key '" + key + 478 "' as '" + key + FOLDER_SUFFIX + "' exists"); 479 } 480 return newDirectory(absolutePath); 481 } 482 483 if(LOG.isDebugEnabled()) { 484 LOG.debug("getFileStatus listing key '" + key + "'"); 485 } 486 PartialListing listing = store.list(key, 1); 487 if (listing.getFiles().length > 0 || 488 listing.getCommonPrefixes().length > 0) { 489 if(LOG.isDebugEnabled()) { 490 LOG.debug("getFileStatus returning 'directory' for key '" + key + 491 "' as it has contents"); 492 } 493 return newDirectory(absolutePath); 494 } 495 496 if(LOG.isDebugEnabled()) { 497 LOG.debug("getFileStatus could not find key '" + key + "'"); 498 } 499 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 500 } 501 502 @Override 503 public URI getUri() { 504 return uri; 505 } 506 507 /** 508 * <p> 509 * If <code>f</code> is a file, this method will make a single call to S3. 510 * If <code>f</code> is a directory, this method will make a maximum of 511 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 512 * files and directories contained directly in <code>f</code>. 513 * </p> 514 */ 515 @Override 516 public FileStatus[] listStatus(Path f) throws IOException { 517 518 Path absolutePath = makeAbsolute(f); 519 String key = pathToKey(absolutePath); 520 521 if (key.length() > 0) { 522 FileMetadata meta = store.retrieveMetadata(key); 523 if (meta != null) { 524 return new FileStatus[] { newFile(meta, absolutePath) }; 525 } 526 } 527 528 URI pathUri = absolutePath.toUri(); 529 Set<FileStatus> status = new TreeSet<FileStatus>(); 530 String priorLastKey = null; 531 do { 532 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 533 for (FileMetadata fileMetadata : listing.getFiles()) { 534 Path subpath = keyToPath(fileMetadata.getKey()); 535 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 536 537 if (fileMetadata.getKey().equals(key + "/")) { 538 // this is just the directory we have been asked to list 539 } 540 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 541 status.add(newDirectory(new Path( 542 absolutePath, 543 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 544 } 545 else { 546 status.add(newFile(fileMetadata, subpath)); 547 } 548 } 549 for (String commonPrefix : listing.getCommonPrefixes()) { 550 Path subpath = keyToPath(commonPrefix); 551 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 552 status.add(newDirectory(new Path(absolutePath, relativePath))); 553 } 554 priorLastKey = listing.getPriorLastKey(); 555 } while (priorLastKey != null); 556 557 if (status.isEmpty() && 558 key.length() > 0 && 559 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 560 throw new FileNotFoundException("File " + f + " does not exist."); 561 } 562 563 return status.toArray(new FileStatus[status.size()]); 564 } 565 566 private FileStatus newFile(FileMetadata meta, Path path) { 567 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 568 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 569 } 570 571 private FileStatus newDirectory(Path path) { 572 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 573 } 574 575 @Override 576 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 577 Path absolutePath = makeAbsolute(f); 578 List<Path> paths = new ArrayList<Path>(); 579 do { 580 paths.add(0, absolutePath); 581 absolutePath = absolutePath.getParent(); 582 } while (absolutePath != null); 583 584 boolean result = true; 585 for (Path path : paths) { 586 result &= mkdir(path); 587 } 588 return result; 589 } 590 591 private boolean mkdir(Path f) throws IOException { 592 try { 593 FileStatus fileStatus = getFileStatus(f); 594 if (fileStatus.isFile()) { 595 throw new FileAlreadyExistsException(String.format( 596 "Can't make directory for path '%s' since it is a file.", f)); 597 598 } 599 } catch (FileNotFoundException e) { 600 if(LOG.isDebugEnabled()) { 601 LOG.debug("Making dir '" + f + "' in S3"); 602 } 603 String key = pathToKey(f) + FOLDER_SUFFIX; 604 store.storeEmptyFile(key); 605 } 606 return true; 607 } 608 609 @Override 610 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 611 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 612 if (fs.isDirectory()) { 613 throw new FileNotFoundException("'" + f + "' is a directory"); 614 } 615 LOG.info("Opening '" + f + "' for reading"); 616 Path absolutePath = makeAbsolute(f); 617 String key = pathToKey(absolutePath); 618 return new FSDataInputStream(new BufferedFSInputStream( 619 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 620 } 621 622 // rename() and delete() use this method to ensure that the parent directory 623 // of the source does not vanish. 624 private void createParent(Path path) throws IOException { 625 Path parent = path.getParent(); 626 if (parent != null) { 627 String key = pathToKey(makeAbsolute(parent)); 628 if (key.length() > 0) { 629 store.storeEmptyFile(key + FOLDER_SUFFIX); 630 } 631 } 632 } 633 634 635 @Override 636 public boolean rename(Path src, Path dst) throws IOException { 637 638 String srcKey = pathToKey(makeAbsolute(src)); 639 640 if (srcKey.length() == 0) { 641 // Cannot rename root of file system 642 return false; 643 } 644 645 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 646 647 // Figure out the final destination 648 String dstKey; 649 try { 650 boolean dstIsFile = getFileStatus(dst).isFile(); 651 if (dstIsFile) { 652 if(LOG.isDebugEnabled()) { 653 LOG.debug(debugPreamble + 654 "returning false as dst is an already existing file"); 655 } 656 return false; 657 } else { 658 if(LOG.isDebugEnabled()) { 659 LOG.debug(debugPreamble + "using dst as output directory"); 660 } 661 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 662 } 663 } catch (FileNotFoundException e) { 664 if(LOG.isDebugEnabled()) { 665 LOG.debug(debugPreamble + "using dst as output destination"); 666 } 667 dstKey = pathToKey(makeAbsolute(dst)); 668 try { 669 if (getFileStatus(dst.getParent()).isFile()) { 670 if(LOG.isDebugEnabled()) { 671 LOG.debug(debugPreamble + 672 "returning false as dst parent exists and is a file"); 673 } 674 return false; 675 } 676 } catch (FileNotFoundException ex) { 677 if(LOG.isDebugEnabled()) { 678 LOG.debug(debugPreamble + 679 "returning false as dst parent does not exist"); 680 } 681 return false; 682 } 683 } 684 685 boolean srcIsFile; 686 try { 687 srcIsFile = getFileStatus(src).isFile(); 688 } catch (FileNotFoundException e) { 689 if(LOG.isDebugEnabled()) { 690 LOG.debug(debugPreamble + "returning false as src does not exist"); 691 } 692 return false; 693 } 694 if (srcIsFile) { 695 if(LOG.isDebugEnabled()) { 696 LOG.debug(debugPreamble + 697 "src is file, so doing copy then delete in S3"); 698 } 699 store.copy(srcKey, dstKey); 700 store.delete(srcKey); 701 } else { 702 if(LOG.isDebugEnabled()) { 703 LOG.debug(debugPreamble + "src is directory, so copying contents"); 704 } 705 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 706 707 List<String> keysToDelete = new ArrayList<String>(); 708 String priorLastKey = null; 709 do { 710 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 711 for (FileMetadata file : listing.getFiles()) { 712 keysToDelete.add(file.getKey()); 713 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 714 } 715 priorLastKey = listing.getPriorLastKey(); 716 } while (priorLastKey != null); 717 718 if(LOG.isDebugEnabled()) { 719 LOG.debug(debugPreamble + 720 "all files in src copied, now removing src files"); 721 } 722 for (String key: keysToDelete) { 723 store.delete(key); 724 } 725 726 try { 727 store.delete(srcKey + FOLDER_SUFFIX); 728 } catch (FileNotFoundException e) { 729 //this is fine, we don't require a marker 730 } 731 if(LOG.isDebugEnabled()) { 732 LOG.debug(debugPreamble + "done"); 733 } 734 } 735 736 return true; 737 } 738 739 @Override 740 public long getDefaultBlockSize() { 741 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 742 } 743 744 /** 745 * Set the working directory to the given directory. 746 */ 747 @Override 748 public void setWorkingDirectory(Path newDir) { 749 workingDir = newDir; 750 } 751 752 @Override 753 public Path getWorkingDirectory() { 754 return workingDir; 755 } 756 757 @Override 758 public String getCanonicalServiceName() { 759 // Does not support Token 760 return null; 761 } 762 }