001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs.s3native; 020 021 import java.io.BufferedOutputStream; 022 import java.io.EOFException; 023 import java.io.File; 024 import java.io.FileNotFoundException; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 import java.net.URI; 030 import java.security.DigestOutputStream; 031 import java.security.MessageDigest; 032 import java.security.NoSuchAlgorithmException; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.Set; 038 import java.util.TreeSet; 039 import java.util.concurrent.TimeUnit; 040 041 import com.google.common.base.Preconditions; 042 import org.apache.hadoop.classification.InterfaceAudience; 043 import org.apache.hadoop.classification.InterfaceStability; 044 import org.apache.hadoop.conf.Configuration; 045 import org.apache.hadoop.fs.BufferedFSInputStream; 046 import org.apache.hadoop.fs.FSDataInputStream; 047 import org.apache.hadoop.fs.FSDataOutputStream; 048 import org.apache.hadoop.fs.FSExceptionMessages; 049 import org.apache.hadoop.fs.FSInputStream; 050 import org.apache.hadoop.fs.FileAlreadyExistsException; 051 import org.apache.hadoop.fs.FileStatus; 052 import org.apache.hadoop.fs.FileSystem; 053 import org.apache.hadoop.fs.Path; 054 import org.apache.hadoop.fs.permission.FsPermission; 055 import org.apache.hadoop.fs.s3.S3Exception; 056 import org.apache.hadoop.io.retry.RetryPolicies; 057 import org.apache.hadoop.io.retry.RetryPolicy; 058 import org.apache.hadoop.io.retry.RetryProxy; 059 import org.apache.hadoop.util.Progressable; 060 import org.slf4j.Logger; 061 import org.slf4j.LoggerFactory; 062 063 /** 064 * <p> 065 * A {@link FileSystem} for reading and writing files stored on 066 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 067 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 068 * stores files on S3 in their 069 * native form so they can be read by other S3 tools. 070 * 071 * A note about directories. S3 of course has no "native" support for them. 072 * The idiom we choose then is: for any directory created by this class, 073 * we use an empty object "#{dirpath}_$folder$" as a marker. 074 * Further, to interoperate with other S3 tools, we also accept the following: 075 * - an object "#{dirpath}/' denoting a directory marker 076 * - if there exists any objects with the prefix "#{dirpath}/", then the 077 * directory is said to exist 078 * - if both a file with the name of a directory and a marker for that 079 * directory exists, then the *file masks the directory*, and the directory 080 * is never returned. 081 * </p> 082 * @see org.apache.hadoop.fs.s3.S3FileSystem 083 */ 084 @InterfaceAudience.Public 085 @InterfaceStability.Stable 086 public class NativeS3FileSystem extends FileSystem { 087 088 public static final Logger LOG = 089 LoggerFactory.getLogger(NativeS3FileSystem.class); 090 091 private static final String FOLDER_SUFFIX = "_$folder$"; 092 static final String PATH_DELIMITER = Path.SEPARATOR; 093 private static final int S3_MAX_LISTING_LENGTH = 1000; 094 095 static class NativeS3FsInputStream extends FSInputStream { 096 097 private NativeFileSystemStore store; 098 private Statistics statistics; 099 private InputStream in; 100 private final String key; 101 private long pos = 0; 102 103 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 104 Preconditions.checkNotNull(in, "Null input stream"); 105 this.store = store; 106 this.statistics = statistics; 107 this.in = in; 108 this.key = key; 109 } 110 111 @Override 112 public synchronized int read() throws IOException { 113 int result; 114 try { 115 result = in.read(); 116 } catch (IOException e) { 117 LOG.info("Received IOException while reading '{}', attempting to reopen", 118 key); 119 LOG.debug("{}", e, e); 120 try { 121 seek(pos); 122 result = in.read(); 123 } catch (EOFException eof) { 124 LOG.debug("EOF on input stream read: {}", eof, eof); 125 result = -1; 126 } 127 } 128 if (result != -1) { 129 pos++; 130 } 131 if (statistics != null && result != -1) { 132 statistics.incrementBytesRead(1); 133 } 134 return result; 135 } 136 @Override 137 public synchronized int read(byte[] b, int off, int len) 138 throws IOException { 139 if (in == null) { 140 throw new EOFException("Cannot read closed stream"); 141 } 142 int result = -1; 143 try { 144 result = in.read(b, off, len); 145 } catch (EOFException eof) { 146 throw eof; 147 } catch (IOException e) { 148 LOG.info( "Received IOException while reading '{}'," + 149 " attempting to reopen.", key); 150 seek(pos); 151 result = in.read(b, off, len); 152 } 153 if (result > 0) { 154 pos += result; 155 } 156 if (statistics != null && result > 0) { 157 statistics.incrementBytesRead(result); 158 } 159 return result; 160 } 161 162 @Override 163 public synchronized void close() throws IOException { 164 closeInnerStream(); 165 } 166 167 /** 168 * Close the inner stream if not null. Even if an exception 169 * is raised during the close, the field is set to null 170 * @throws IOException if raised by the close() operation. 171 */ 172 private void closeInnerStream() throws IOException { 173 if (in != null) { 174 try { 175 in.close(); 176 } finally { 177 in = null; 178 } 179 } 180 } 181 182 /** 183 * Update inner stream with a new stream and position 184 * @param newStream new stream -must not be null 185 * @param newpos new position 186 * @throws IOException IO exception on a failure to close the existing 187 * stream. 188 */ 189 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { 190 Preconditions.checkNotNull(newStream, "Null newstream argument"); 191 closeInnerStream(); 192 in = newStream; 193 this.pos = newpos; 194 } 195 196 @Override 197 public synchronized void seek(long newpos) throws IOException { 198 if (newpos < 0) { 199 throw new EOFException( 200 FSExceptionMessages.NEGATIVE_SEEK); 201 } 202 if (pos != newpos) { 203 // the seek is attempting to move the current position 204 LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); 205 InputStream newStream = store.retrieve(key, newpos); 206 updateInnerStream(newStream, newpos); 207 } 208 } 209 210 @Override 211 public synchronized long getPos() throws IOException { 212 return pos; 213 } 214 @Override 215 public boolean seekToNewSource(long targetPos) throws IOException { 216 return false; 217 } 218 } 219 220 private class NativeS3FsOutputStream extends OutputStream { 221 222 private Configuration conf; 223 private String key; 224 private File backupFile; 225 private OutputStream backupStream; 226 private MessageDigest digest; 227 private boolean closed; 228 229 public NativeS3FsOutputStream(Configuration conf, 230 NativeFileSystemStore store, String key, Progressable progress, 231 int bufferSize) throws IOException { 232 this.conf = conf; 233 this.key = key; 234 this.backupFile = newBackupFile(); 235 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 236 try { 237 this.digest = MessageDigest.getInstance("MD5"); 238 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 239 new FileOutputStream(backupFile), this.digest)); 240 } catch (NoSuchAlgorithmException e) { 241 LOG.warn("Cannot load MD5 digest algorithm," + 242 "skipping message integrity check.", e); 243 this.backupStream = new BufferedOutputStream( 244 new FileOutputStream(backupFile)); 245 } 246 } 247 248 private File newBackupFile() throws IOException { 249 File dir = new File(conf.get("fs.s3.buffer.dir")); 250 if (!dir.mkdirs() && !dir.exists()) { 251 throw new IOException("Cannot create S3 buffer directory: " + dir); 252 } 253 File result = File.createTempFile("output-", ".tmp", dir); 254 result.deleteOnExit(); 255 return result; 256 } 257 258 @Override 259 public void flush() throws IOException { 260 backupStream.flush(); 261 } 262 263 @Override 264 public synchronized void close() throws IOException { 265 if (closed) { 266 return; 267 } 268 269 backupStream.close(); 270 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); 271 272 try { 273 byte[] md5Hash = digest == null ? null : digest.digest(); 274 store.storeFile(key, backupFile, md5Hash); 275 } finally { 276 if (!backupFile.delete()) { 277 LOG.warn("Could not delete temporary s3n file: " + backupFile); 278 } 279 super.close(); 280 closed = true; 281 } 282 LOG.info("OutputStream for key '{}' upload complete", key); 283 } 284 285 @Override 286 public void write(int b) throws IOException { 287 backupStream.write(b); 288 } 289 290 @Override 291 public void write(byte[] b, int off, int len) throws IOException { 292 backupStream.write(b, off, len); 293 } 294 } 295 296 private URI uri; 297 private NativeFileSystemStore store; 298 private Path workingDir; 299 300 public NativeS3FileSystem() { 301 // set store in initialize() 302 } 303 304 public NativeS3FileSystem(NativeFileSystemStore store) { 305 this.store = store; 306 } 307 308 /** 309 * Return the protocol scheme for the FileSystem. 310 * <p/> 311 * 312 * @return <code>s3n</code> 313 */ 314 @Override 315 public String getScheme() { 316 return "s3n"; 317 } 318 319 @Override 320 public void initialize(URI uri, Configuration conf) throws IOException { 321 super.initialize(uri, conf); 322 if (store == null) { 323 store = createDefaultStore(conf); 324 } 325 store.initialize(uri, conf); 326 setConf(conf); 327 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 328 this.workingDir = 329 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 330 } 331 332 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 333 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 334 335 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 336 conf.getInt("fs.s3.maxRetries", 4), 337 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 338 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 339 new HashMap<Class<? extends Exception>, RetryPolicy>(); 340 exceptionToPolicyMap.put(IOException.class, basePolicy); 341 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 342 343 RetryPolicy methodPolicy = RetryPolicies.retryByException( 344 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 345 Map<String, RetryPolicy> methodNameToPolicyMap = 346 new HashMap<String, RetryPolicy>(); 347 methodNameToPolicyMap.put("storeFile", methodPolicy); 348 methodNameToPolicyMap.put("rename", methodPolicy); 349 350 return (NativeFileSystemStore) 351 RetryProxy.create(NativeFileSystemStore.class, store, 352 methodNameToPolicyMap); 353 } 354 355 private static String pathToKey(Path path) { 356 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 357 // allow uris without trailing slash after bucket to refer to root, 358 // like s3n://mybucket 359 return ""; 360 } 361 if (!path.isAbsolute()) { 362 throw new IllegalArgumentException("Path must be absolute: " + path); 363 } 364 String ret = path.toUri().getPath().substring(1); // remove initial slash 365 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 366 ret = ret.substring(0, ret.length() -1); 367 } 368 return ret; 369 } 370 371 private static Path keyToPath(String key) { 372 return new Path("/" + key); 373 } 374 375 private Path makeAbsolute(Path path) { 376 if (path.isAbsolute()) { 377 return path; 378 } 379 return new Path(workingDir, path); 380 } 381 382 /** This optional operation is not yet supported. */ 383 @Override 384 public FSDataOutputStream append(Path f, int bufferSize, 385 Progressable progress) throws IOException { 386 throw new IOException("Not supported"); 387 } 388 389 @Override 390 public FSDataOutputStream create(Path f, FsPermission permission, 391 boolean overwrite, int bufferSize, short replication, long blockSize, 392 Progressable progress) throws IOException { 393 394 if (exists(f) && !overwrite) { 395 throw new FileAlreadyExistsException("File already exists: " + f); 396 } 397 398 if(LOG.isDebugEnabled()) { 399 LOG.debug("Creating new file '" + f + "' in S3"); 400 } 401 Path absolutePath = makeAbsolute(f); 402 String key = pathToKey(absolutePath); 403 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 404 key, progress, bufferSize), statistics); 405 } 406 407 @Override 408 public boolean delete(Path f, boolean recurse) throws IOException { 409 FileStatus status; 410 try { 411 status = getFileStatus(f); 412 } catch (FileNotFoundException e) { 413 if(LOG.isDebugEnabled()) { 414 LOG.debug("Delete called for '" + f + 415 "' but file does not exist, so returning false"); 416 } 417 return false; 418 } 419 Path absolutePath = makeAbsolute(f); 420 String key = pathToKey(absolutePath); 421 if (status.isDirectory()) { 422 if (!recurse && listStatus(f).length > 0) { 423 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); 424 } 425 426 createParent(f); 427 428 if(LOG.isDebugEnabled()) { 429 LOG.debug("Deleting directory '" + f + "'"); 430 } 431 String priorLastKey = null; 432 do { 433 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 434 for (FileMetadata file : listing.getFiles()) { 435 store.delete(file.getKey()); 436 } 437 priorLastKey = listing.getPriorLastKey(); 438 } while (priorLastKey != null); 439 440 try { 441 store.delete(key + FOLDER_SUFFIX); 442 } catch (FileNotFoundException e) { 443 //this is fine, we don't require a marker 444 } 445 } else { 446 if(LOG.isDebugEnabled()) { 447 LOG.debug("Deleting file '" + f + "'"); 448 } 449 createParent(f); 450 store.delete(key); 451 } 452 return true; 453 } 454 455 @Override 456 public FileStatus getFileStatus(Path f) throws IOException { 457 Path absolutePath = makeAbsolute(f); 458 String key = pathToKey(absolutePath); 459 460 if (key.length() == 0) { // root always exists 461 return newDirectory(absolutePath); 462 } 463 464 if(LOG.isDebugEnabled()) { 465 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 466 } 467 FileMetadata meta = store.retrieveMetadata(key); 468 if (meta != null) { 469 if(LOG.isDebugEnabled()) { 470 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 471 } 472 return newFile(meta, absolutePath); 473 } 474 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 475 if(LOG.isDebugEnabled()) { 476 LOG.debug("getFileStatus returning 'directory' for key '" + key + 477 "' as '" + key + FOLDER_SUFFIX + "' exists"); 478 } 479 return newDirectory(absolutePath); 480 } 481 482 if(LOG.isDebugEnabled()) { 483 LOG.debug("getFileStatus listing key '" + key + "'"); 484 } 485 PartialListing listing = store.list(key, 1); 486 if (listing.getFiles().length > 0 || 487 listing.getCommonPrefixes().length > 0) { 488 if(LOG.isDebugEnabled()) { 489 LOG.debug("getFileStatus returning 'directory' for key '" + key + 490 "' as it has contents"); 491 } 492 return newDirectory(absolutePath); 493 } 494 495 if(LOG.isDebugEnabled()) { 496 LOG.debug("getFileStatus could not find key '" + key + "'"); 497 } 498 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 499 } 500 501 @Override 502 public URI getUri() { 503 return uri; 504 } 505 506 /** 507 * <p> 508 * If <code>f</code> is a file, this method will make a single call to S3. 509 * If <code>f</code> is a directory, this method will make a maximum of 510 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 511 * files and directories contained directly in <code>f</code>. 512 * </p> 513 */ 514 @Override 515 public FileStatus[] listStatus(Path f) throws IOException { 516 517 Path absolutePath = makeAbsolute(f); 518 String key = pathToKey(absolutePath); 519 520 if (key.length() > 0) { 521 FileMetadata meta = store.retrieveMetadata(key); 522 if (meta != null) { 523 return new FileStatus[] { newFile(meta, absolutePath) }; 524 } 525 } 526 527 URI pathUri = absolutePath.toUri(); 528 Set<FileStatus> status = new TreeSet<FileStatus>(); 529 String priorLastKey = null; 530 do { 531 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 532 for (FileMetadata fileMetadata : listing.getFiles()) { 533 Path subpath = keyToPath(fileMetadata.getKey()); 534 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 535 536 if (fileMetadata.getKey().equals(key + "/")) { 537 // this is just the directory we have been asked to list 538 } 539 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 540 status.add(newDirectory(new Path( 541 absolutePath, 542 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 543 } 544 else { 545 status.add(newFile(fileMetadata, subpath)); 546 } 547 } 548 for (String commonPrefix : listing.getCommonPrefixes()) { 549 Path subpath = keyToPath(commonPrefix); 550 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 551 status.add(newDirectory(new Path(absolutePath, relativePath))); 552 } 553 priorLastKey = listing.getPriorLastKey(); 554 } while (priorLastKey != null); 555 556 if (status.isEmpty() && 557 key.length() > 0 && 558 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 559 throw new FileNotFoundException("File " + f + " does not exist."); 560 } 561 562 return status.toArray(new FileStatus[status.size()]); 563 } 564 565 private FileStatus newFile(FileMetadata meta, Path path) { 566 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 567 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 568 } 569 570 private FileStatus newDirectory(Path path) { 571 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 572 } 573 574 @Override 575 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 576 Path absolutePath = makeAbsolute(f); 577 List<Path> paths = new ArrayList<Path>(); 578 do { 579 paths.add(0, absolutePath); 580 absolutePath = absolutePath.getParent(); 581 } while (absolutePath != null); 582 583 boolean result = true; 584 for (Path path : paths) { 585 result &= mkdir(path); 586 } 587 return result; 588 } 589 590 private boolean mkdir(Path f) throws IOException { 591 try { 592 FileStatus fileStatus = getFileStatus(f); 593 if (fileStatus.isFile()) { 594 throw new FileAlreadyExistsException(String.format( 595 "Can't make directory for path '%s' since it is a file.", f)); 596 597 } 598 } catch (FileNotFoundException e) { 599 if(LOG.isDebugEnabled()) { 600 LOG.debug("Making dir '" + f + "' in S3"); 601 } 602 String key = pathToKey(f) + FOLDER_SUFFIX; 603 store.storeEmptyFile(key); 604 } 605 return true; 606 } 607 608 @Override 609 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 610 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 611 if (fs.isDirectory()) { 612 throw new FileNotFoundException("'" + f + "' is a directory"); 613 } 614 LOG.info("Opening '" + f + "' for reading"); 615 Path absolutePath = makeAbsolute(f); 616 String key = pathToKey(absolutePath); 617 return new FSDataInputStream(new BufferedFSInputStream( 618 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 619 } 620 621 // rename() and delete() use this method to ensure that the parent directory 622 // of the source does not vanish. 623 private void createParent(Path path) throws IOException { 624 Path parent = path.getParent(); 625 if (parent != null) { 626 String key = pathToKey(makeAbsolute(parent)); 627 if (key.length() > 0) { 628 store.storeEmptyFile(key + FOLDER_SUFFIX); 629 } 630 } 631 } 632 633 634 @Override 635 public boolean rename(Path src, Path dst) throws IOException { 636 637 String srcKey = pathToKey(makeAbsolute(src)); 638 639 if (srcKey.length() == 0) { 640 // Cannot rename root of file system 641 return false; 642 } 643 644 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 645 646 // Figure out the final destination 647 String dstKey; 648 try { 649 boolean dstIsFile = getFileStatus(dst).isFile(); 650 if (dstIsFile) { 651 if(LOG.isDebugEnabled()) { 652 LOG.debug(debugPreamble + 653 "returning false as dst is an already existing file"); 654 } 655 return false; 656 } else { 657 if(LOG.isDebugEnabled()) { 658 LOG.debug(debugPreamble + "using dst as output directory"); 659 } 660 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 661 } 662 } catch (FileNotFoundException e) { 663 if(LOG.isDebugEnabled()) { 664 LOG.debug(debugPreamble + "using dst as output destination"); 665 } 666 dstKey = pathToKey(makeAbsolute(dst)); 667 try { 668 if (getFileStatus(dst.getParent()).isFile()) { 669 if(LOG.isDebugEnabled()) { 670 LOG.debug(debugPreamble + 671 "returning false as dst parent exists and is a file"); 672 } 673 return false; 674 } 675 } catch (FileNotFoundException ex) { 676 if(LOG.isDebugEnabled()) { 677 LOG.debug(debugPreamble + 678 "returning false as dst parent does not exist"); 679 } 680 return false; 681 } 682 } 683 684 boolean srcIsFile; 685 try { 686 srcIsFile = getFileStatus(src).isFile(); 687 } catch (FileNotFoundException e) { 688 if(LOG.isDebugEnabled()) { 689 LOG.debug(debugPreamble + "returning false as src does not exist"); 690 } 691 return false; 692 } 693 if (srcIsFile) { 694 if(LOG.isDebugEnabled()) { 695 LOG.debug(debugPreamble + 696 "src is file, so doing copy then delete in S3"); 697 } 698 store.copy(srcKey, dstKey); 699 store.delete(srcKey); 700 } else { 701 if(LOG.isDebugEnabled()) { 702 LOG.debug(debugPreamble + "src is directory, so copying contents"); 703 } 704 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 705 706 List<String> keysToDelete = new ArrayList<String>(); 707 String priorLastKey = null; 708 do { 709 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 710 for (FileMetadata file : listing.getFiles()) { 711 keysToDelete.add(file.getKey()); 712 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 713 } 714 priorLastKey = listing.getPriorLastKey(); 715 } while (priorLastKey != null); 716 717 if(LOG.isDebugEnabled()) { 718 LOG.debug(debugPreamble + 719 "all files in src copied, now removing src files"); 720 } 721 for (String key: keysToDelete) { 722 store.delete(key); 723 } 724 725 try { 726 store.delete(srcKey + FOLDER_SUFFIX); 727 } catch (FileNotFoundException e) { 728 //this is fine, we don't require a marker 729 } 730 if(LOG.isDebugEnabled()) { 731 LOG.debug(debugPreamble + "done"); 732 } 733 } 734 735 return true; 736 } 737 738 @Override 739 public long getDefaultBlockSize() { 740 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 741 } 742 743 /** 744 * Set the working directory to the given directory. 745 */ 746 @Override 747 public void setWorkingDirectory(Path newDir) { 748 workingDir = newDir; 749 } 750 751 @Override 752 public Path getWorkingDirectory() { 753 return workingDir; 754 } 755 756 @Override 757 public String getCanonicalServiceName() { 758 // Does not support Token 759 return null; 760 } 761 }