001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3native; 020 021import java.io.BufferedOutputStream; 022import java.io.EOFException; 023import java.io.File; 024import java.io.FileNotFoundException; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InputStream; 028import java.io.OutputStream; 029import java.net.URI; 030import java.security.DigestOutputStream; 031import java.security.MessageDigest; 032import java.security.NoSuchAlgorithmException; 033import java.util.ArrayList; 034import java.util.HashMap; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeSet; 039import java.util.concurrent.TimeUnit; 040 041import com.google.common.base.Preconditions; 042import org.apache.hadoop.classification.InterfaceAudience; 043import org.apache.hadoop.classification.InterfaceStability; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.BufferedFSInputStream; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FSExceptionMessages; 049import org.apache.hadoop.fs.FSInputStream; 050import org.apache.hadoop.fs.FileAlreadyExistsException; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.FileSystem; 053import org.apache.hadoop.fs.LocalDirAllocator; 054import org.apache.hadoop.fs.Path; 055import org.apache.hadoop.fs.permission.FsPermission; 056import org.apache.hadoop.fs.s3.S3Exception; 057import org.apache.hadoop.io.IOUtils; 058import org.apache.hadoop.io.retry.RetryPolicies; 059import org.apache.hadoop.io.retry.RetryPolicy; 060import org.apache.hadoop.io.retry.RetryProxy; 061import org.apache.hadoop.util.Progressable; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A {@link FileSystem} for reading and writing files stored on 067 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 068 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 069 * stores files on S3 in their 070 * native form so they can be read by other S3 tools. 071 * <p> 072 * A note about directories. S3 of course has no "native" support for them. 073 * The idiom we choose then is: for any directory created by this class, 074 * we use an empty object "#{dirpath}_$folder$" as a marker. 075 * Further, to interoperate with other S3 tools, we also accept the following: 076 * <ul> 077 * <li>an object "#{dirpath}/' denoting a directory marker</li> 078 * <li> 079 * if there exists any objects with the prefix "#{dirpath}/", then the 080 * directory is said to exist 081 * </li> 082 * <li> 083 * if both a file with the name of a directory and a marker for that 084 * directory exists, then the *file masks the directory*, and the directory 085 * is never returned. 086 * </li> 087 * </ul> 088 * 089 * @see org.apache.hadoop.fs.s3.S3FileSystem 090 */ 091@InterfaceAudience.Public 092@InterfaceStability.Stable 093public class NativeS3FileSystem extends FileSystem { 094 095 public static final Logger LOG = 096 LoggerFactory.getLogger(NativeS3FileSystem.class); 097 098 private static final String FOLDER_SUFFIX = "_$folder$"; 099 static final String PATH_DELIMITER = Path.SEPARATOR; 100 private static final int S3_MAX_LISTING_LENGTH = 1000; 101 102 static class NativeS3FsInputStream extends FSInputStream { 103 104 private NativeFileSystemStore store; 105 private Statistics statistics; 106 private InputStream in; 107 private final String key; 108 private long pos = 0; 109 110 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 111 Preconditions.checkNotNull(in, "Null input stream"); 112 this.store = store; 113 this.statistics = statistics; 114 this.in = in; 115 this.key = key; 116 } 117 118 @Override 119 public synchronized int read() throws IOException { 120 int result; 121 try { 122 result = in.read(); 123 } catch (IOException e) { 124 LOG.info("Received IOException while reading '{}', attempting to reopen", 125 key); 126 LOG.debug("{}", e, e); 127 try { 128 reopen(pos); 129 result = in.read(); 130 } catch (EOFException eof) { 131 LOG.debug("EOF on input stream read: {}", eof, eof); 132 result = -1; 133 } 134 } 135 if (result != -1) { 136 pos++; 137 } 138 if (statistics != null && result != -1) { 139 statistics.incrementBytesRead(1); 140 } 141 return result; 142 } 143 @Override 144 public synchronized int read(byte[] b, int off, int len) 145 throws IOException { 146 if (in == null) { 147 throw new EOFException("Cannot read closed stream"); 148 } 149 int result = -1; 150 try { 151 result = in.read(b, off, len); 152 } catch (EOFException eof) { 153 throw eof; 154 } catch (IOException e) { 155 LOG.info( "Received IOException while reading '{}'," + 156 " attempting to reopen.", key); 157 reopen(pos); 158 result = in.read(b, off, len); 159 } 160 if (result > 0) { 161 pos += result; 162 } 163 if (statistics != null && result > 0) { 164 statistics.incrementBytesRead(result); 165 } 166 return result; 167 } 168 169 @Override 170 public synchronized void close() throws IOException { 171 closeInnerStream(); 172 } 173 174 /** 175 * Close the inner stream if not null. Even if an exception 176 * is raised during the close, the field is set to null 177 */ 178 private void closeInnerStream() { 179 IOUtils.closeStream(in); 180 in = null; 181 } 182 183 /** 184 * Reopen a new input stream with the specified position 185 * @param pos the position to reopen a new stream 186 * @throws IOException 187 */ 188 private synchronized void reopen(long pos) throws IOException { 189 LOG.debug("Reopening key '{}' for reading at position '{}", key, pos); 190 InputStream newStream = store.retrieve(key, pos); 191 updateInnerStream(newStream, pos); 192 } 193 194 /** 195 * Update inner stream with a new stream and position 196 * @param newStream new stream -must not be null 197 * @param newpos new position 198 * @throws IOException IO exception on a failure to close the existing 199 * stream. 200 */ 201 private synchronized void updateInnerStream(InputStream newStream, long newpos) throws IOException { 202 Preconditions.checkNotNull(newStream, "Null newstream argument"); 203 closeInnerStream(); 204 in = newStream; 205 this.pos = newpos; 206 } 207 208 @Override 209 public synchronized void seek(long newpos) throws IOException { 210 if (newpos < 0) { 211 throw new EOFException( 212 FSExceptionMessages.NEGATIVE_SEEK); 213 } 214 if (pos != newpos) { 215 // the seek is attempting to move the current position 216 reopen(newpos); 217 } 218 } 219 220 @Override 221 public synchronized long getPos() throws IOException { 222 return pos; 223 } 224 @Override 225 public boolean seekToNewSource(long targetPos) throws IOException { 226 return false; 227 } 228 } 229 230 private class NativeS3FsOutputStream extends OutputStream { 231 232 private Configuration conf; 233 private String key; 234 private File backupFile; 235 private OutputStream backupStream; 236 private MessageDigest digest; 237 private boolean closed; 238 private LocalDirAllocator lDirAlloc; 239 240 public NativeS3FsOutputStream(Configuration conf, 241 NativeFileSystemStore store, String key, Progressable progress, 242 int bufferSize) throws IOException { 243 this.conf = conf; 244 this.key = key; 245 this.backupFile = newBackupFile(); 246 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 247 try { 248 this.digest = MessageDigest.getInstance("MD5"); 249 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 250 new FileOutputStream(backupFile), this.digest)); 251 } catch (NoSuchAlgorithmException e) { 252 LOG.warn("Cannot load MD5 digest algorithm," + 253 "skipping message integrity check.", e); 254 this.backupStream = new BufferedOutputStream( 255 new FileOutputStream(backupFile)); 256 } 257 } 258 259 private File newBackupFile() throws IOException { 260 if (lDirAlloc == null) { 261 lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir"); 262 } 263 File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf); 264 result.deleteOnExit(); 265 return result; 266 } 267 268 @Override 269 public void flush() throws IOException { 270 backupStream.flush(); 271 } 272 273 @Override 274 public synchronized void close() throws IOException { 275 if (closed) { 276 return; 277 } 278 279 backupStream.close(); 280 LOG.info("OutputStream for key '{}' closed. Now beginning upload", key); 281 282 try { 283 byte[] md5Hash = digest == null ? null : digest.digest(); 284 store.storeFile(key, backupFile, md5Hash); 285 } finally { 286 if (!backupFile.delete()) { 287 LOG.warn("Could not delete temporary s3n file: " + backupFile); 288 } 289 super.close(); 290 closed = true; 291 } 292 LOG.info("OutputStream for key '{}' upload complete", key); 293 } 294 295 @Override 296 public void write(int b) throws IOException { 297 backupStream.write(b); 298 } 299 300 @Override 301 public void write(byte[] b, int off, int len) throws IOException { 302 backupStream.write(b, off, len); 303 } 304 } 305 306 private URI uri; 307 private NativeFileSystemStore store; 308 private Path workingDir; 309 310 public NativeS3FileSystem() { 311 // set store in initialize() 312 } 313 314 public NativeS3FileSystem(NativeFileSystemStore store) { 315 this.store = store; 316 } 317 318 /** 319 * Return the protocol scheme for the FileSystem. 320 * 321 * @return <code>s3n</code> 322 */ 323 @Override 324 public String getScheme() { 325 return "s3n"; 326 } 327 328 @Override 329 public void initialize(URI uri, Configuration conf) throws IOException { 330 super.initialize(uri, conf); 331 if (store == null) { 332 store = createDefaultStore(conf); 333 } 334 store.initialize(uri, conf); 335 setConf(conf); 336 this.uri = S3xLoginHelper.buildFSURI(uri); 337 this.workingDir = 338 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 339 } 340 341 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 342 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 343 344 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 345 conf.getInt("fs.s3.maxRetries", 4), 346 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 347 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 348 new HashMap<Class<? extends Exception>, RetryPolicy>(); 349 exceptionToPolicyMap.put(IOException.class, basePolicy); 350 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 351 352 RetryPolicy methodPolicy = RetryPolicies.retryByException( 353 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 354 Map<String, RetryPolicy> methodNameToPolicyMap = 355 new HashMap<String, RetryPolicy>(); 356 methodNameToPolicyMap.put("storeFile", methodPolicy); 357 methodNameToPolicyMap.put("rename", methodPolicy); 358 359 return (NativeFileSystemStore) 360 RetryProxy.create(NativeFileSystemStore.class, store, 361 methodNameToPolicyMap); 362 } 363 364 private static String pathToKey(Path path) { 365 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 366 // allow uris without trailing slash after bucket to refer to root, 367 // like s3n://mybucket 368 return ""; 369 } 370 if (!path.isAbsolute()) { 371 throw new IllegalArgumentException("Path must be absolute: " + path); 372 } 373 String ret = path.toUri().getPath().substring(1); // remove initial slash 374 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 375 ret = ret.substring(0, ret.length() -1); 376 } 377 return ret; 378 } 379 380 private static Path keyToPath(String key) { 381 return new Path("/" + key); 382 } 383 384 private Path makeAbsolute(Path path) { 385 if (path.isAbsolute()) { 386 return path; 387 } 388 return new Path(workingDir, path); 389 } 390 391 /** 392 * Check that a Path belongs to this FileSystem. 393 * Unlike the superclass, this version does not look at authority, 394 * only hostnames. 395 * @param path to check 396 * @throws IllegalArgumentException if there is an FS mismatch 397 */ 398 @Override 399 protected void checkPath(Path path) { 400 S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort()); 401 } 402 403 @Override 404 protected URI canonicalizeUri(URI rawUri) { 405 return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort()); 406 } 407 408 /** This optional operation is not yet supported. */ 409 @Override 410 public FSDataOutputStream append(Path f, int bufferSize, 411 Progressable progress) throws IOException { 412 throw new IOException("Not supported"); 413 } 414 415 @Override 416 public FSDataOutputStream create(Path f, FsPermission permission, 417 boolean overwrite, int bufferSize, short replication, long blockSize, 418 Progressable progress) throws IOException { 419 420 if (exists(f) && !overwrite) { 421 throw new FileAlreadyExistsException("File already exists: " + f); 422 } 423 424 if(LOG.isDebugEnabled()) { 425 LOG.debug("Creating new file '" + f + "' in S3"); 426 } 427 Path absolutePath = makeAbsolute(f); 428 String key = pathToKey(absolutePath); 429 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 430 key, progress, bufferSize), statistics); 431 } 432 433 @Override 434 public boolean delete(Path f, boolean recurse) throws IOException { 435 FileStatus status; 436 try { 437 status = getFileStatus(f); 438 } catch (FileNotFoundException e) { 439 if(LOG.isDebugEnabled()) { 440 LOG.debug("Delete called for '" + f + 441 "' but file does not exist, so returning false"); 442 } 443 return false; 444 } 445 Path absolutePath = makeAbsolute(f); 446 String key = pathToKey(absolutePath); 447 if (status.isDirectory()) { 448 if (!recurse && listStatus(f).length > 0) { 449 throw new IOException("Can not delete " + f + " as is a not empty directory and recurse option is false"); 450 } 451 452 createParent(f); 453 454 if(LOG.isDebugEnabled()) { 455 LOG.debug("Deleting directory '" + f + "'"); 456 } 457 String priorLastKey = null; 458 do { 459 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 460 for (FileMetadata file : listing.getFiles()) { 461 store.delete(file.getKey()); 462 } 463 priorLastKey = listing.getPriorLastKey(); 464 } while (priorLastKey != null); 465 466 try { 467 store.delete(key + FOLDER_SUFFIX); 468 } catch (FileNotFoundException e) { 469 //this is fine, we don't require a marker 470 } 471 } else { 472 if(LOG.isDebugEnabled()) { 473 LOG.debug("Deleting file '" + f + "'"); 474 } 475 createParent(f); 476 store.delete(key); 477 } 478 return true; 479 } 480 481 @Override 482 public FileStatus getFileStatus(Path f) throws IOException { 483 Path absolutePath = makeAbsolute(f); 484 String key = pathToKey(absolutePath); 485 486 if (key.length() == 0) { // root always exists 487 return newDirectory(absolutePath); 488 } 489 490 if(LOG.isDebugEnabled()) { 491 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 492 } 493 FileMetadata meta = store.retrieveMetadata(key); 494 if (meta != null) { 495 if(LOG.isDebugEnabled()) { 496 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 497 } 498 return newFile(meta, absolutePath); 499 } 500 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 501 if(LOG.isDebugEnabled()) { 502 LOG.debug("getFileStatus returning 'directory' for key '" + key + 503 "' as '" + key + FOLDER_SUFFIX + "' exists"); 504 } 505 return newDirectory(absolutePath); 506 } 507 508 if(LOG.isDebugEnabled()) { 509 LOG.debug("getFileStatus listing key '" + key + "'"); 510 } 511 PartialListing listing = store.list(key, 1); 512 if (listing.getFiles().length > 0 || 513 listing.getCommonPrefixes().length > 0) { 514 if(LOG.isDebugEnabled()) { 515 LOG.debug("getFileStatus returning 'directory' for key '" + key + 516 "' as it has contents"); 517 } 518 return newDirectory(absolutePath); 519 } 520 521 if(LOG.isDebugEnabled()) { 522 LOG.debug("getFileStatus could not find key '" + key + "'"); 523 } 524 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 525 } 526 527 @Override 528 public URI getUri() { 529 return uri; 530 } 531 532 /** 533 * <p> 534 * If <code>f</code> is a file, this method will make a single call to S3. 535 * If <code>f</code> is a directory, this method will make a maximum of 536 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 537 * files and directories contained directly in <code>f</code>. 538 * </p> 539 */ 540 @Override 541 public FileStatus[] listStatus(Path f) throws IOException { 542 543 Path absolutePath = makeAbsolute(f); 544 String key = pathToKey(absolutePath); 545 546 if (key.length() > 0) { 547 FileMetadata meta = store.retrieveMetadata(key); 548 if (meta != null) { 549 return new FileStatus[] { newFile(meta, absolutePath) }; 550 } 551 } 552 553 URI pathUri = absolutePath.toUri(); 554 Set<FileStatus> status = new TreeSet<FileStatus>(); 555 String priorLastKey = null; 556 do { 557 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 558 for (FileMetadata fileMetadata : listing.getFiles()) { 559 Path subpath = keyToPath(fileMetadata.getKey()); 560 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 561 562 if (fileMetadata.getKey().equals(key + "/")) { 563 // this is just the directory we have been asked to list 564 } 565 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 566 status.add(newDirectory(new Path( 567 absolutePath, 568 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 569 } 570 else { 571 status.add(newFile(fileMetadata, subpath)); 572 } 573 } 574 for (String commonPrefix : listing.getCommonPrefixes()) { 575 Path subpath = keyToPath(commonPrefix); 576 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 577 // sometimes the common prefix includes the base dir (HADOOP-13830). 578 // avoid that problem by detecting it and keeping it out 579 // of the list 580 if (!relativePath.isEmpty()) { 581 status.add(newDirectory(new Path(absolutePath, relativePath))); 582 } 583 } 584 priorLastKey = listing.getPriorLastKey(); 585 } while (priorLastKey != null); 586 587 if (status.isEmpty() && 588 key.length() > 0 && 589 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 590 throw new FileNotFoundException("File " + f + " does not exist."); 591 } 592 593 return status.toArray(new FileStatus[status.size()]); 594 } 595 596 private FileStatus newFile(FileMetadata meta, Path path) { 597 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 598 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 599 } 600 601 private FileStatus newDirectory(Path path) { 602 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 603 } 604 605 @Override 606 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 607 Path absolutePath = makeAbsolute(f); 608 List<Path> paths = new ArrayList<Path>(); 609 do { 610 paths.add(0, absolutePath); 611 absolutePath = absolutePath.getParent(); 612 } while (absolutePath != null); 613 614 boolean result = true; 615 for (Path path : paths) { 616 result &= mkdir(path); 617 } 618 return result; 619 } 620 621 private boolean mkdir(Path f) throws IOException { 622 try { 623 FileStatus fileStatus = getFileStatus(f); 624 if (fileStatus.isFile()) { 625 throw new FileAlreadyExistsException(String.format( 626 "Can't make directory for path '%s' since it is a file.", f)); 627 628 } 629 } catch (FileNotFoundException e) { 630 if(LOG.isDebugEnabled()) { 631 LOG.debug("Making dir '" + f + "' in S3"); 632 } 633 String key = pathToKey(f) + FOLDER_SUFFIX; 634 store.storeEmptyFile(key); 635 } 636 return true; 637 } 638 639 @Override 640 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 641 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 642 if (fs.isDirectory()) { 643 throw new FileNotFoundException("'" + f + "' is a directory"); 644 } 645 LOG.info("Opening '" + f + "' for reading"); 646 Path absolutePath = makeAbsolute(f); 647 String key = pathToKey(absolutePath); 648 return new FSDataInputStream(new BufferedFSInputStream( 649 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 650 } 651 652 // rename() and delete() use this method to ensure that the parent directory 653 // of the source does not vanish. 654 private void createParent(Path path) throws IOException { 655 Path parent = path.getParent(); 656 if (parent != null) { 657 String key = pathToKey(makeAbsolute(parent)); 658 if (key.length() > 0) { 659 store.storeEmptyFile(key + FOLDER_SUFFIX); 660 } 661 } 662 } 663 664 665 @Override 666 public boolean rename(Path src, Path dst) throws IOException { 667 668 String srcKey = pathToKey(makeAbsolute(src)); 669 670 if (srcKey.length() == 0) { 671 // Cannot rename root of file system 672 return false; 673 } 674 675 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 676 677 // Figure out the final destination 678 String dstKey; 679 try { 680 boolean dstIsFile = getFileStatus(dst).isFile(); 681 if (dstIsFile) { 682 if(LOG.isDebugEnabled()) { 683 LOG.debug(debugPreamble + 684 "returning false as dst is an already existing file"); 685 } 686 return false; 687 } else { 688 if(LOG.isDebugEnabled()) { 689 LOG.debug(debugPreamble + "using dst as output directory"); 690 } 691 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 692 } 693 } catch (FileNotFoundException e) { 694 if(LOG.isDebugEnabled()) { 695 LOG.debug(debugPreamble + "using dst as output destination"); 696 } 697 dstKey = pathToKey(makeAbsolute(dst)); 698 try { 699 if (getFileStatus(dst.getParent()).isFile()) { 700 if(LOG.isDebugEnabled()) { 701 LOG.debug(debugPreamble + 702 "returning false as dst parent exists and is a file"); 703 } 704 return false; 705 } 706 } catch (FileNotFoundException ex) { 707 if(LOG.isDebugEnabled()) { 708 LOG.debug(debugPreamble + 709 "returning false as dst parent does not exist"); 710 } 711 return false; 712 } 713 } 714 715 boolean srcIsFile; 716 try { 717 srcIsFile = getFileStatus(src).isFile(); 718 } catch (FileNotFoundException e) { 719 if(LOG.isDebugEnabled()) { 720 LOG.debug(debugPreamble + "returning false as src does not exist"); 721 } 722 return false; 723 } 724 if (srcIsFile) { 725 if(LOG.isDebugEnabled()) { 726 LOG.debug(debugPreamble + 727 "src is file, so doing copy then delete in S3"); 728 } 729 store.copy(srcKey, dstKey); 730 store.delete(srcKey); 731 } else { 732 if(LOG.isDebugEnabled()) { 733 LOG.debug(debugPreamble + "src is directory, so copying contents"); 734 } 735 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 736 737 List<String> keysToDelete = new ArrayList<String>(); 738 String priorLastKey = null; 739 do { 740 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 741 for (FileMetadata file : listing.getFiles()) { 742 keysToDelete.add(file.getKey()); 743 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 744 } 745 priorLastKey = listing.getPriorLastKey(); 746 } while (priorLastKey != null); 747 748 if(LOG.isDebugEnabled()) { 749 LOG.debug(debugPreamble + 750 "all files in src copied, now removing src files"); 751 } 752 for (String key: keysToDelete) { 753 store.delete(key); 754 } 755 756 try { 757 store.delete(srcKey + FOLDER_SUFFIX); 758 } catch (FileNotFoundException e) { 759 //this is fine, we don't require a marker 760 } 761 if(LOG.isDebugEnabled()) { 762 LOG.debug(debugPreamble + "done"); 763 } 764 } 765 766 return true; 767 } 768 769 @Override 770 public long getDefaultBlockSize() { 771 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 772 } 773 774 /** 775 * Set the working directory to the given directory. 776 */ 777 @Override 778 public void setWorkingDirectory(Path newDir) { 779 workingDir = newDir; 780 } 781 782 @Override 783 public Path getWorkingDirectory() { 784 return workingDir; 785 } 786 787 @Override 788 public String getCanonicalServiceName() { 789 // Does not support Token 790 return null; 791 } 792}