001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs.s3native; 020 021 import java.io.BufferedOutputStream; 022 import java.io.File; 023 import java.io.FileNotFoundException; 024 import java.io.FileOutputStream; 025 import java.io.IOException; 026 import java.io.InputStream; 027 import java.io.OutputStream; 028 import java.net.URI; 029 import java.security.DigestOutputStream; 030 import java.security.MessageDigest; 031 import java.security.NoSuchAlgorithmException; 032 import java.util.ArrayList; 033 import java.util.HashMap; 034 import java.util.List; 035 import java.util.Map; 036 import java.util.Set; 037 import java.util.TreeSet; 038 import java.util.concurrent.TimeUnit; 039 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 import org.apache.hadoop.classification.InterfaceAudience; 043 import org.apache.hadoop.classification.InterfaceStability; 044 import org.apache.hadoop.conf.Configuration; 045 import org.apache.hadoop.fs.BufferedFSInputStream; 046 import org.apache.hadoop.fs.FSDataInputStream; 047 import org.apache.hadoop.fs.FSDataOutputStream; 048 import org.apache.hadoop.fs.FSInputStream; 049 import org.apache.hadoop.fs.FileStatus; 050 import org.apache.hadoop.fs.FileSystem; 051 import org.apache.hadoop.fs.Path; 052 import org.apache.hadoop.fs.permission.FsPermission; 053 import org.apache.hadoop.fs.s3.S3Exception; 054 import org.apache.hadoop.io.retry.RetryPolicies; 055 import org.apache.hadoop.io.retry.RetryPolicy; 056 import org.apache.hadoop.io.retry.RetryProxy; 057 import org.apache.hadoop.util.Progressable; 058 059 /** 060 * <p> 061 * A {@link FileSystem} for reading and writing files stored on 062 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 063 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 064 * stores files on S3 in their 065 * native form so they can be read by other S3 tools. 066 * 067 * A note about directories. S3 of course has no "native" support for them. 068 * The idiom we choose then is: for any directory created by this class, 069 * we use an empty object "#{dirpath}_$folder$" as a marker. 070 * Further, to interoperate with other S3 tools, we also accept the following: 071 * - an object "#{dirpath}/' denoting a directory marker 072 * - if there exists any objects with the prefix "#{dirpath}/", then the 073 * directory is said to exist 074 * - if both a file with the name of a directory and a marker for that 075 * directory exists, then the *file masks the directory*, and the directory 076 * is never returned. 077 * </p> 078 * @see org.apache.hadoop.fs.s3.S3FileSystem 079 */ 080 @InterfaceAudience.Public 081 @InterfaceStability.Stable 082 public class NativeS3FileSystem extends FileSystem { 083 084 public static final Log LOG = 085 LogFactory.getLog(NativeS3FileSystem.class); 086 087 private static final String FOLDER_SUFFIX = "_$folder$"; 088 static final String PATH_DELIMITER = Path.SEPARATOR; 089 private static final int S3_MAX_LISTING_LENGTH = 1000; 090 091 static class NativeS3FsInputStream extends FSInputStream { 092 093 private NativeFileSystemStore store; 094 private Statistics statistics; 095 private InputStream in; 096 private final String key; 097 private long pos = 0; 098 099 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 100 this.store = store; 101 this.statistics = statistics; 102 this.in = in; 103 this.key = key; 104 } 105 106 @Override 107 public synchronized int read() throws IOException { 108 int result = -1; 109 try { 110 result = in.read(); 111 } catch (IOException e) { 112 LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); 113 seek(pos); 114 result = in.read(); 115 } 116 if (result != -1) { 117 pos++; 118 } 119 if (statistics != null && result != -1) { 120 statistics.incrementBytesRead(1); 121 } 122 return result; 123 } 124 @Override 125 public synchronized int read(byte[] b, int off, int len) 126 throws IOException { 127 128 int result = -1; 129 try { 130 result = in.read(b, off, len); 131 } catch (IOException e) { 132 LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); 133 seek(pos); 134 result = in.read(b, off, len); 135 } 136 if (result > 0) { 137 pos += result; 138 } 139 if (statistics != null && result > 0) { 140 statistics.incrementBytesRead(result); 141 } 142 return result; 143 } 144 145 @Override 146 public void close() throws IOException { 147 in.close(); 148 } 149 150 @Override 151 public synchronized void seek(long pos) throws IOException { 152 in.close(); 153 LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'"); 154 in = store.retrieve(key, pos); 155 this.pos = pos; 156 } 157 @Override 158 public synchronized long getPos() throws IOException { 159 return pos; 160 } 161 @Override 162 public boolean seekToNewSource(long targetPos) throws IOException { 163 return false; 164 } 165 } 166 167 private class NativeS3FsOutputStream extends OutputStream { 168 169 private Configuration conf; 170 private String key; 171 private File backupFile; 172 private OutputStream backupStream; 173 private MessageDigest digest; 174 private boolean closed; 175 176 public NativeS3FsOutputStream(Configuration conf, 177 NativeFileSystemStore store, String key, Progressable progress, 178 int bufferSize) throws IOException { 179 this.conf = conf; 180 this.key = key; 181 this.backupFile = newBackupFile(); 182 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 183 try { 184 this.digest = MessageDigest.getInstance("MD5"); 185 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 186 new FileOutputStream(backupFile), this.digest)); 187 } catch (NoSuchAlgorithmException e) { 188 LOG.warn("Cannot load MD5 digest algorithm," + 189 "skipping message integrity check.", e); 190 this.backupStream = new BufferedOutputStream( 191 new FileOutputStream(backupFile)); 192 } 193 } 194 195 private File newBackupFile() throws IOException { 196 File dir = new File(conf.get("fs.s3.buffer.dir")); 197 if (!dir.mkdirs() && !dir.exists()) { 198 throw new IOException("Cannot create S3 buffer directory: " + dir); 199 } 200 File result = File.createTempFile("output-", ".tmp", dir); 201 result.deleteOnExit(); 202 return result; 203 } 204 205 @Override 206 public void flush() throws IOException { 207 backupStream.flush(); 208 } 209 210 @Override 211 public synchronized void close() throws IOException { 212 if (closed) { 213 return; 214 } 215 216 backupStream.close(); 217 LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); 218 219 try { 220 byte[] md5Hash = digest == null ? null : digest.digest(); 221 store.storeFile(key, backupFile, md5Hash); 222 } finally { 223 if (!backupFile.delete()) { 224 LOG.warn("Could not delete temporary s3n file: " + backupFile); 225 } 226 super.close(); 227 closed = true; 228 } 229 LOG.info("OutputStream for key '" + key + "' upload complete"); 230 } 231 232 @Override 233 public void write(int b) throws IOException { 234 backupStream.write(b); 235 } 236 237 @Override 238 public void write(byte[] b, int off, int len) throws IOException { 239 backupStream.write(b, off, len); 240 } 241 } 242 243 private URI uri; 244 private NativeFileSystemStore store; 245 private Path workingDir; 246 247 public NativeS3FileSystem() { 248 // set store in initialize() 249 } 250 251 public NativeS3FileSystem(NativeFileSystemStore store) { 252 this.store = store; 253 } 254 255 /** 256 * Return the protocol scheme for the FileSystem. 257 * <p/> 258 * 259 * @return <code>s3n</code> 260 */ 261 @Override 262 public String getScheme() { 263 return "s3n"; 264 } 265 266 @Override 267 public void initialize(URI uri, Configuration conf) throws IOException { 268 super.initialize(uri, conf); 269 if (store == null) { 270 store = createDefaultStore(conf); 271 } 272 store.initialize(uri, conf); 273 setConf(conf); 274 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 275 this.workingDir = 276 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); 277 } 278 279 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 280 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 281 282 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 283 conf.getInt("fs.s3.maxRetries", 4), 284 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 285 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 286 new HashMap<Class<? extends Exception>, RetryPolicy>(); 287 exceptionToPolicyMap.put(IOException.class, basePolicy); 288 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 289 290 RetryPolicy methodPolicy = RetryPolicies.retryByException( 291 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 292 Map<String, RetryPolicy> methodNameToPolicyMap = 293 new HashMap<String, RetryPolicy>(); 294 methodNameToPolicyMap.put("storeFile", methodPolicy); 295 methodNameToPolicyMap.put("rename", methodPolicy); 296 297 return (NativeFileSystemStore) 298 RetryProxy.create(NativeFileSystemStore.class, store, 299 methodNameToPolicyMap); 300 } 301 302 private static String pathToKey(Path path) { 303 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { 304 // allow uris without trailing slash after bucket to refer to root, 305 // like s3n://mybucket 306 return ""; 307 } 308 if (!path.isAbsolute()) { 309 throw new IllegalArgumentException("Path must be absolute: " + path); 310 } 311 String ret = path.toUri().getPath().substring(1); // remove initial slash 312 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 313 ret = ret.substring(0, ret.length() -1); 314 } 315 return ret; 316 } 317 318 private static Path keyToPath(String key) { 319 return new Path("/" + key); 320 } 321 322 private Path makeAbsolute(Path path) { 323 if (path.isAbsolute()) { 324 return path; 325 } 326 return new Path(workingDir, path); 327 } 328 329 /** This optional operation is not yet supported. */ 330 @Override 331 public FSDataOutputStream append(Path f, int bufferSize, 332 Progressable progress) throws IOException { 333 throw new IOException("Not supported"); 334 } 335 336 @Override 337 public FSDataOutputStream create(Path f, FsPermission permission, 338 boolean overwrite, int bufferSize, short replication, long blockSize, 339 Progressable progress) throws IOException { 340 341 if (exists(f) && !overwrite) { 342 throw new IOException("File already exists:"+f); 343 } 344 345 if(LOG.isDebugEnabled()) { 346 LOG.debug("Creating new file '" + f + "' in S3"); 347 } 348 Path absolutePath = makeAbsolute(f); 349 String key = pathToKey(absolutePath); 350 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 351 key, progress, bufferSize), statistics); 352 } 353 354 @Override 355 public boolean delete(Path f, boolean recurse) throws IOException { 356 FileStatus status; 357 try { 358 status = getFileStatus(f); 359 } catch (FileNotFoundException e) { 360 if(LOG.isDebugEnabled()) { 361 LOG.debug("Delete called for '" + f + 362 "' but file does not exist, so returning false"); 363 } 364 return false; 365 } 366 Path absolutePath = makeAbsolute(f); 367 String key = pathToKey(absolutePath); 368 if (status.isDirectory()) { 369 if (!recurse && listStatus(f).length > 0) { 370 throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false"); 371 } 372 373 createParent(f); 374 375 if(LOG.isDebugEnabled()) { 376 LOG.debug("Deleting directory '" + f + "'"); 377 } 378 String priorLastKey = null; 379 do { 380 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 381 for (FileMetadata file : listing.getFiles()) { 382 store.delete(file.getKey()); 383 } 384 priorLastKey = listing.getPriorLastKey(); 385 } while (priorLastKey != null); 386 387 try { 388 store.delete(key + FOLDER_SUFFIX); 389 } catch (FileNotFoundException e) { 390 //this is fine, we don't require a marker 391 } 392 } else { 393 if(LOG.isDebugEnabled()) { 394 LOG.debug("Deleting file '" + f + "'"); 395 } 396 createParent(f); 397 store.delete(key); 398 } 399 return true; 400 } 401 402 @Override 403 public FileStatus getFileStatus(Path f) throws IOException { 404 Path absolutePath = makeAbsolute(f); 405 String key = pathToKey(absolutePath); 406 407 if (key.length() == 0) { // root always exists 408 return newDirectory(absolutePath); 409 } 410 411 if(LOG.isDebugEnabled()) { 412 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 413 } 414 FileMetadata meta = store.retrieveMetadata(key); 415 if (meta != null) { 416 if(LOG.isDebugEnabled()) { 417 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 418 } 419 return newFile(meta, absolutePath); 420 } 421 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 422 if(LOG.isDebugEnabled()) { 423 LOG.debug("getFileStatus returning 'directory' for key '" + key + 424 "' as '" + key + FOLDER_SUFFIX + "' exists"); 425 } 426 return newDirectory(absolutePath); 427 } 428 429 if(LOG.isDebugEnabled()) { 430 LOG.debug("getFileStatus listing key '" + key + "'"); 431 } 432 PartialListing listing = store.list(key, 1); 433 if (listing.getFiles().length > 0 || 434 listing.getCommonPrefixes().length > 0) { 435 if(LOG.isDebugEnabled()) { 436 LOG.debug("getFileStatus returning 'directory' for key '" + key + 437 "' as it has contents"); 438 } 439 return newDirectory(absolutePath); 440 } 441 442 if(LOG.isDebugEnabled()) { 443 LOG.debug("getFileStatus could not find key '" + key + "'"); 444 } 445 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 446 } 447 448 @Override 449 public URI getUri() { 450 return uri; 451 } 452 453 /** 454 * <p> 455 * If <code>f</code> is a file, this method will make a single call to S3. 456 * If <code>f</code> is a directory, this method will make a maximum of 457 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 458 * files and directories contained directly in <code>f</code>. 459 * </p> 460 */ 461 @Override 462 public FileStatus[] listStatus(Path f) throws IOException { 463 464 Path absolutePath = makeAbsolute(f); 465 String key = pathToKey(absolutePath); 466 467 if (key.length() > 0) { 468 FileMetadata meta = store.retrieveMetadata(key); 469 if (meta != null) { 470 return new FileStatus[] { newFile(meta, absolutePath) }; 471 } 472 } 473 474 URI pathUri = absolutePath.toUri(); 475 Set<FileStatus> status = new TreeSet<FileStatus>(); 476 String priorLastKey = null; 477 do { 478 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 479 for (FileMetadata fileMetadata : listing.getFiles()) { 480 Path subpath = keyToPath(fileMetadata.getKey()); 481 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 482 483 if (fileMetadata.getKey().equals(key + "/")) { 484 // this is just the directory we have been asked to list 485 } 486 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 487 status.add(newDirectory(new Path( 488 absolutePath, 489 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 490 } 491 else { 492 status.add(newFile(fileMetadata, subpath)); 493 } 494 } 495 for (String commonPrefix : listing.getCommonPrefixes()) { 496 Path subpath = keyToPath(commonPrefix); 497 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 498 status.add(newDirectory(new Path(absolutePath, relativePath))); 499 } 500 priorLastKey = listing.getPriorLastKey(); 501 } while (priorLastKey != null); 502 503 if (status.isEmpty() && 504 key.length() > 0 && 505 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 506 throw new FileNotFoundException("File " + f + " does not exist."); 507 } 508 509 return status.toArray(new FileStatus[status.size()]); 510 } 511 512 private FileStatus newFile(FileMetadata meta, Path path) { 513 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 514 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory())); 515 } 516 517 private FileStatus newDirectory(Path path) { 518 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory())); 519 } 520 521 @Override 522 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 523 Path absolutePath = makeAbsolute(f); 524 List<Path> paths = new ArrayList<Path>(); 525 do { 526 paths.add(0, absolutePath); 527 absolutePath = absolutePath.getParent(); 528 } while (absolutePath != null); 529 530 boolean result = true; 531 for (Path path : paths) { 532 result &= mkdir(path); 533 } 534 return result; 535 } 536 537 private boolean mkdir(Path f) throws IOException { 538 try { 539 FileStatus fileStatus = getFileStatus(f); 540 if (fileStatus.isFile()) { 541 throw new IOException(String.format( 542 "Can't make directory for path '%s' since it is a file.", f)); 543 544 } 545 } catch (FileNotFoundException e) { 546 if(LOG.isDebugEnabled()) { 547 LOG.debug("Making dir '" + f + "' in S3"); 548 } 549 String key = pathToKey(f) + FOLDER_SUFFIX; 550 store.storeEmptyFile(key); 551 } 552 return true; 553 } 554 555 @Override 556 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 557 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 558 if (fs.isDirectory()) { 559 throw new IOException("'" + f + "' is a directory"); 560 } 561 LOG.info("Opening '" + f + "' for reading"); 562 Path absolutePath = makeAbsolute(f); 563 String key = pathToKey(absolutePath); 564 return new FSDataInputStream(new BufferedFSInputStream( 565 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 566 } 567 568 // rename() and delete() use this method to ensure that the parent directory 569 // of the source does not vanish. 570 private void createParent(Path path) throws IOException { 571 Path parent = path.getParent(); 572 if (parent != null) { 573 String key = pathToKey(makeAbsolute(parent)); 574 if (key.length() > 0) { 575 store.storeEmptyFile(key + FOLDER_SUFFIX); 576 } 577 } 578 } 579 580 581 @Override 582 public boolean rename(Path src, Path dst) throws IOException { 583 584 String srcKey = pathToKey(makeAbsolute(src)); 585 586 if (srcKey.length() == 0) { 587 // Cannot rename root of file system 588 return false; 589 } 590 591 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 592 593 // Figure out the final destination 594 String dstKey; 595 try { 596 boolean dstIsFile = getFileStatus(dst).isFile(); 597 if (dstIsFile) { 598 if(LOG.isDebugEnabled()) { 599 LOG.debug(debugPreamble + 600 "returning false as dst is an already existing file"); 601 } 602 return false; 603 } else { 604 if(LOG.isDebugEnabled()) { 605 LOG.debug(debugPreamble + "using dst as output directory"); 606 } 607 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 608 } 609 } catch (FileNotFoundException e) { 610 if(LOG.isDebugEnabled()) { 611 LOG.debug(debugPreamble + "using dst as output destination"); 612 } 613 dstKey = pathToKey(makeAbsolute(dst)); 614 try { 615 if (getFileStatus(dst.getParent()).isFile()) { 616 if(LOG.isDebugEnabled()) { 617 LOG.debug(debugPreamble + 618 "returning false as dst parent exists and is a file"); 619 } 620 return false; 621 } 622 } catch (FileNotFoundException ex) { 623 if(LOG.isDebugEnabled()) { 624 LOG.debug(debugPreamble + 625 "returning false as dst parent does not exist"); 626 } 627 return false; 628 } 629 } 630 631 boolean srcIsFile; 632 try { 633 srcIsFile = getFileStatus(src).isFile(); 634 } catch (FileNotFoundException e) { 635 if(LOG.isDebugEnabled()) { 636 LOG.debug(debugPreamble + "returning false as src does not exist"); 637 } 638 return false; 639 } 640 if (srcIsFile) { 641 if(LOG.isDebugEnabled()) { 642 LOG.debug(debugPreamble + 643 "src is file, so doing copy then delete in S3"); 644 } 645 store.copy(srcKey, dstKey); 646 store.delete(srcKey); 647 } else { 648 if(LOG.isDebugEnabled()) { 649 LOG.debug(debugPreamble + "src is directory, so copying contents"); 650 } 651 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 652 653 List<String> keysToDelete = new ArrayList<String>(); 654 String priorLastKey = null; 655 do { 656 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 657 for (FileMetadata file : listing.getFiles()) { 658 keysToDelete.add(file.getKey()); 659 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 660 } 661 priorLastKey = listing.getPriorLastKey(); 662 } while (priorLastKey != null); 663 664 if(LOG.isDebugEnabled()) { 665 LOG.debug(debugPreamble + 666 "all files in src copied, now removing src files"); 667 } 668 for (String key: keysToDelete) { 669 store.delete(key); 670 } 671 672 try { 673 store.delete(srcKey + FOLDER_SUFFIX); 674 } catch (FileNotFoundException e) { 675 //this is fine, we don't require a marker 676 } 677 if(LOG.isDebugEnabled()) { 678 LOG.debug(debugPreamble + "done"); 679 } 680 } 681 682 return true; 683 } 684 685 @Override 686 public long getDefaultBlockSize() { 687 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 688 } 689 690 /** 691 * Set the working directory to the given directory. 692 */ 693 @Override 694 public void setWorkingDirectory(Path newDir) { 695 workingDir = newDir; 696 } 697 698 @Override 699 public Path getWorkingDirectory() { 700 return workingDir; 701 } 702 703 @Override 704 public String getCanonicalServiceName() { 705 // Does not support Token 706 return null; 707 } 708 }