001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3native; 020 021import java.io.BufferedOutputStream; 022import java.io.File; 023import java.io.FileNotFoundException; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.URI; 029import java.security.DigestOutputStream; 030import java.security.MessageDigest; 031import java.security.NoSuchAlgorithmException; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.Set; 037import java.util.TreeSet; 038import java.util.concurrent.TimeUnit; 039 040import org.apache.commons.logging.Log; 041import org.apache.commons.logging.LogFactory; 042import org.apache.hadoop.classification.InterfaceAudience; 043import org.apache.hadoop.classification.InterfaceStability; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.fs.BufferedFSInputStream; 046import org.apache.hadoop.fs.FSDataInputStream; 047import org.apache.hadoop.fs.FSDataOutputStream; 048import org.apache.hadoop.fs.FSInputStream; 049import org.apache.hadoop.fs.FileStatus; 050import org.apache.hadoop.fs.FileSystem; 051import org.apache.hadoop.fs.Path; 052import org.apache.hadoop.fs.permission.FsPermission; 053import org.apache.hadoop.fs.s3.S3Exception; 054import org.apache.hadoop.io.retry.RetryPolicies; 055import org.apache.hadoop.io.retry.RetryPolicy; 056import org.apache.hadoop.io.retry.RetryProxy; 057import org.apache.hadoop.util.Progressable; 058 059/** 060 * <p> 061 * A {@link FileSystem} for reading and writing files stored on 062 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 063 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation 064 * stores files on S3 in their 065 * native form so they can be read by other S3 tools. 066 * 067 * A note about directories. S3 of course has no "native" support for them. 068 * The idiom we choose then is: for any directory created by this class, 069 * we use an empty object "#{dirpath}_$folder$" as a marker. 070 * Further, to interoperate with other S3 tools, we also accept the following: 071 * - an object "#{dirpath}/' denoting a directory marker 072 * - if there exists any objects with the prefix "#{dirpath}/", then the 073 * directory is said to exist 074 * - if both a file with the name of a directory and a marker for that 075 * directory exists, then the *file masks the directory*, and the directory 076 * is never returned. 077 * </p> 078 * @see org.apache.hadoop.fs.s3.S3FileSystem 079 */ 080@InterfaceAudience.Public 081@InterfaceStability.Stable 082public class NativeS3FileSystem extends FileSystem { 083 084 public static final Log LOG = 085 LogFactory.getLog(NativeS3FileSystem.class); 086 087 private static final String FOLDER_SUFFIX = "_$folder$"; 088 static final String PATH_DELIMITER = Path.SEPARATOR; 089 private static final int S3_MAX_LISTING_LENGTH = 1000; 090 091 static class NativeS3FsInputStream extends FSInputStream { 092 093 private NativeFileSystemStore store; 094 private Statistics statistics; 095 private InputStream in; 096 private final String key; 097 private long pos = 0; 098 099 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) { 100 this.store = store; 101 this.statistics = statistics; 102 this.in = in; 103 this.key = key; 104 } 105 106 @Override 107 public synchronized int read() throws IOException { 108 int result = -1; 109 try { 110 result = in.read(); 111 } catch (IOException e) { 112 LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); 113 seek(pos); 114 result = in.read(); 115 } 116 if (result != -1) { 117 pos++; 118 } 119 if (statistics != null && result != -1) { 120 statistics.incrementBytesRead(1); 121 } 122 return result; 123 } 124 @Override 125 public synchronized int read(byte[] b, int off, int len) 126 throws IOException { 127 128 int result = -1; 129 try { 130 result = in.read(b, off, len); 131 } catch (IOException e) { 132 LOG.info("Received IOException while reading '" + key + "', attempting to reopen."); 133 seek(pos); 134 result = in.read(b, off, len); 135 } 136 if (result > 0) { 137 pos += result; 138 } 139 if (statistics != null && result > 0) { 140 statistics.incrementBytesRead(result); 141 } 142 return result; 143 } 144 145 @Override 146 public void close() throws IOException { 147 in.close(); 148 } 149 150 @Override 151 public synchronized void seek(long pos) throws IOException { 152 in.close(); 153 LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'"); 154 in = store.retrieve(key, pos); 155 this.pos = pos; 156 } 157 @Override 158 public synchronized long getPos() throws IOException { 159 return pos; 160 } 161 @Override 162 public boolean seekToNewSource(long targetPos) throws IOException { 163 return false; 164 } 165 } 166 167 private class NativeS3FsOutputStream extends OutputStream { 168 169 private Configuration conf; 170 private String key; 171 private File backupFile; 172 private OutputStream backupStream; 173 private MessageDigest digest; 174 private boolean closed; 175 176 public NativeS3FsOutputStream(Configuration conf, 177 NativeFileSystemStore store, String key, Progressable progress, 178 int bufferSize) throws IOException { 179 this.conf = conf; 180 this.key = key; 181 this.backupFile = newBackupFile(); 182 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'"); 183 try { 184 this.digest = MessageDigest.getInstance("MD5"); 185 this.backupStream = new BufferedOutputStream(new DigestOutputStream( 186 new FileOutputStream(backupFile), this.digest)); 187 } catch (NoSuchAlgorithmException e) { 188 LOG.warn("Cannot load MD5 digest algorithm," + 189 "skipping message integrity check.", e); 190 this.backupStream = new BufferedOutputStream( 191 new FileOutputStream(backupFile)); 192 } 193 } 194 195 private File newBackupFile() throws IOException { 196 File dir = new File(conf.get("fs.s3.buffer.dir")); 197 if (!dir.mkdirs() && !dir.exists()) { 198 throw new IOException("Cannot create S3 buffer directory: " + dir); 199 } 200 File result = File.createTempFile("output-", ".tmp", dir); 201 result.deleteOnExit(); 202 return result; 203 } 204 205 @Override 206 public void flush() throws IOException { 207 backupStream.flush(); 208 } 209 210 @Override 211 public synchronized void close() throws IOException { 212 if (closed) { 213 return; 214 } 215 216 backupStream.close(); 217 LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload"); 218 219 try { 220 byte[] md5Hash = digest == null ? null : digest.digest(); 221 store.storeFile(key, backupFile, md5Hash); 222 } finally { 223 if (!backupFile.delete()) { 224 LOG.warn("Could not delete temporary s3n file: " + backupFile); 225 } 226 super.close(); 227 closed = true; 228 } 229 LOG.info("OutputStream for key '" + key + "' upload complete"); 230 } 231 232 @Override 233 public void write(int b) throws IOException { 234 backupStream.write(b); 235 } 236 237 @Override 238 public void write(byte[] b, int off, int len) throws IOException { 239 backupStream.write(b, off, len); 240 } 241 } 242 243 private URI uri; 244 private NativeFileSystemStore store; 245 private Path workingDir; 246 247 public NativeS3FileSystem() { 248 // set store in initialize() 249 } 250 251 public NativeS3FileSystem(NativeFileSystemStore store) { 252 this.store = store; 253 } 254 255 @Override 256 public void initialize(URI uri, Configuration conf) throws IOException { 257 super.initialize(uri, conf); 258 if (store == null) { 259 store = createDefaultStore(conf); 260 } 261 store.initialize(uri, conf); 262 setConf(conf); 263 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 264 this.workingDir = 265 new Path("/user", System.getProperty("user.name")).makeQualified(this); 266 } 267 268 private static NativeFileSystemStore createDefaultStore(Configuration conf) { 269 NativeFileSystemStore store = new Jets3tNativeFileSystemStore(); 270 271 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 272 conf.getInt("fs.s3.maxRetries", 4), 273 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 274 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 275 new HashMap<Class<? extends Exception>, RetryPolicy>(); 276 exceptionToPolicyMap.put(IOException.class, basePolicy); 277 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 278 279 RetryPolicy methodPolicy = RetryPolicies.retryByException( 280 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 281 Map<String, RetryPolicy> methodNameToPolicyMap = 282 new HashMap<String, RetryPolicy>(); 283 methodNameToPolicyMap.put("storeFile", methodPolicy); 284 methodNameToPolicyMap.put("rename", methodPolicy); 285 286 return (NativeFileSystemStore) 287 RetryProxy.create(NativeFileSystemStore.class, store, 288 methodNameToPolicyMap); 289 } 290 291 private static String pathToKey(Path path) { 292 if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) { 293 // allow uris without trailing slash after bucket to refer to root, 294 // like s3n://mybucket 295 return ""; 296 } 297 if (!path.isAbsolute()) { 298 throw new IllegalArgumentException("Path must be absolute: " + path); 299 } 300 String ret = path.toUri().getPath().substring(1); // remove initial slash 301 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) { 302 ret = ret.substring(0, ret.length() -1); 303 } 304 return ret; 305 } 306 307 private static Path keyToPath(String key) { 308 return new Path("/" + key); 309 } 310 311 private Path makeAbsolute(Path path) { 312 if (path.isAbsolute()) { 313 return path; 314 } 315 return new Path(workingDir, path); 316 } 317 318 /** This optional operation is not yet supported. */ 319 @Override 320 public FSDataOutputStream append(Path f, int bufferSize, 321 Progressable progress) throws IOException { 322 throw new IOException("Not supported"); 323 } 324 325 @Override 326 public FSDataOutputStream create(Path f, FsPermission permission, 327 boolean overwrite, int bufferSize, short replication, long blockSize, 328 Progressable progress) throws IOException { 329 330 if (exists(f) && !overwrite) { 331 throw new IOException("File already exists:"+f); 332 } 333 334 if(LOG.isDebugEnabled()) { 335 LOG.debug("Creating new file '" + f + "' in S3"); 336 } 337 Path absolutePath = makeAbsolute(f); 338 String key = pathToKey(absolutePath); 339 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store, 340 key, progress, bufferSize), statistics); 341 } 342 343 @Override 344 public boolean delete(Path f, boolean recurse) throws IOException { 345 FileStatus status; 346 try { 347 status = getFileStatus(f); 348 } catch (FileNotFoundException e) { 349 if(LOG.isDebugEnabled()) { 350 LOG.debug("Delete called for '" + f + 351 "' but file does not exist, so returning false"); 352 } 353 return false; 354 } 355 Path absolutePath = makeAbsolute(f); 356 String key = pathToKey(absolutePath); 357 if (status.isDirectory()) { 358 if (!recurse && listStatus(f).length > 0) { 359 throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false"); 360 } 361 362 createParent(f); 363 364 if(LOG.isDebugEnabled()) { 365 LOG.debug("Deleting directory '" + f + "'"); 366 } 367 String priorLastKey = null; 368 do { 369 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true); 370 for (FileMetadata file : listing.getFiles()) { 371 store.delete(file.getKey()); 372 } 373 priorLastKey = listing.getPriorLastKey(); 374 } while (priorLastKey != null); 375 376 try { 377 store.delete(key + FOLDER_SUFFIX); 378 } catch (FileNotFoundException e) { 379 //this is fine, we don't require a marker 380 } 381 } else { 382 if(LOG.isDebugEnabled()) { 383 LOG.debug("Deleting file '" + f + "'"); 384 } 385 createParent(f); 386 store.delete(key); 387 } 388 return true; 389 } 390 391 @Override 392 public FileStatus getFileStatus(Path f) throws IOException { 393 Path absolutePath = makeAbsolute(f); 394 String key = pathToKey(absolutePath); 395 396 if (key.length() == 0) { // root always exists 397 return newDirectory(absolutePath); 398 } 399 400 if(LOG.isDebugEnabled()) { 401 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'"); 402 } 403 FileMetadata meta = store.retrieveMetadata(key); 404 if (meta != null) { 405 if(LOG.isDebugEnabled()) { 406 LOG.debug("getFileStatus returning 'file' for key '" + key + "'"); 407 } 408 return newFile(meta, absolutePath); 409 } 410 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) { 411 if(LOG.isDebugEnabled()) { 412 LOG.debug("getFileStatus returning 'directory' for key '" + key + 413 "' as '" + key + FOLDER_SUFFIX + "' exists"); 414 } 415 return newDirectory(absolutePath); 416 } 417 418 if(LOG.isDebugEnabled()) { 419 LOG.debug("getFileStatus listing key '" + key + "'"); 420 } 421 PartialListing listing = store.list(key, 1); 422 if (listing.getFiles().length > 0 || 423 listing.getCommonPrefixes().length > 0) { 424 if(LOG.isDebugEnabled()) { 425 LOG.debug("getFileStatus returning 'directory' for key '" + key + 426 "' as it has contents"); 427 } 428 return newDirectory(absolutePath); 429 } 430 431 if(LOG.isDebugEnabled()) { 432 LOG.debug("getFileStatus could not find key '" + key + "'"); 433 } 434 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'"); 435 } 436 437 @Override 438 public URI getUri() { 439 return uri; 440 } 441 442 /** 443 * <p> 444 * If <code>f</code> is a file, this method will make a single call to S3. 445 * If <code>f</code> is a directory, this method will make a maximum of 446 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of 447 * files and directories contained directly in <code>f</code>. 448 * </p> 449 */ 450 @Override 451 public FileStatus[] listStatus(Path f) throws IOException { 452 453 Path absolutePath = makeAbsolute(f); 454 String key = pathToKey(absolutePath); 455 456 if (key.length() > 0) { 457 FileMetadata meta = store.retrieveMetadata(key); 458 if (meta != null) { 459 return new FileStatus[] { newFile(meta, absolutePath) }; 460 } 461 } 462 463 URI pathUri = absolutePath.toUri(); 464 Set<FileStatus> status = new TreeSet<FileStatus>(); 465 String priorLastKey = null; 466 do { 467 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false); 468 for (FileMetadata fileMetadata : listing.getFiles()) { 469 Path subpath = keyToPath(fileMetadata.getKey()); 470 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 471 472 if (fileMetadata.getKey().equals(key + "/")) { 473 // this is just the directory we have been asked to list 474 } 475 else if (relativePath.endsWith(FOLDER_SUFFIX)) { 476 status.add(newDirectory(new Path( 477 absolutePath, 478 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX))))); 479 } 480 else { 481 status.add(newFile(fileMetadata, subpath)); 482 } 483 } 484 for (String commonPrefix : listing.getCommonPrefixes()) { 485 Path subpath = keyToPath(commonPrefix); 486 String relativePath = pathUri.relativize(subpath.toUri()).getPath(); 487 status.add(newDirectory(new Path(absolutePath, relativePath))); 488 } 489 priorLastKey = listing.getPriorLastKey(); 490 } while (priorLastKey != null); 491 492 if (status.isEmpty() && 493 key.length() > 0 && 494 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) { 495 throw new FileNotFoundException("File " + f + " does not exist."); 496 } 497 498 return status.toArray(new FileStatus[status.size()]); 499 } 500 501 private FileStatus newFile(FileMetadata meta, Path path) { 502 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(), 503 meta.getLastModified(), path.makeQualified(this)); 504 } 505 506 private FileStatus newDirectory(Path path) { 507 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this)); 508 } 509 510 @Override 511 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 512 Path absolutePath = makeAbsolute(f); 513 List<Path> paths = new ArrayList<Path>(); 514 do { 515 paths.add(0, absolutePath); 516 absolutePath = absolutePath.getParent(); 517 } while (absolutePath != null); 518 519 boolean result = true; 520 for (Path path : paths) { 521 result &= mkdir(path); 522 } 523 return result; 524 } 525 526 private boolean mkdir(Path f) throws IOException { 527 try { 528 FileStatus fileStatus = getFileStatus(f); 529 if (fileStatus.isFile()) { 530 throw new IOException(String.format( 531 "Can't make directory for path '%s' since it is a file.", f)); 532 533 } 534 } catch (FileNotFoundException e) { 535 if(LOG.isDebugEnabled()) { 536 LOG.debug("Making dir '" + f + "' in S3"); 537 } 538 String key = pathToKey(f) + FOLDER_SUFFIX; 539 store.storeEmptyFile(key); 540 } 541 return true; 542 } 543 544 @Override 545 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 546 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist 547 if (fs.isDirectory()) { 548 throw new IOException("'" + f + "' is a directory"); 549 } 550 LOG.info("Opening '" + f + "' for reading"); 551 Path absolutePath = makeAbsolute(f); 552 String key = pathToKey(absolutePath); 553 return new FSDataInputStream(new BufferedFSInputStream( 554 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize)); 555 } 556 557 // rename() and delete() use this method to ensure that the parent directory 558 // of the source does not vanish. 559 private void createParent(Path path) throws IOException { 560 Path parent = path.getParent(); 561 if (parent != null) { 562 String key = pathToKey(makeAbsolute(parent)); 563 if (key.length() > 0) { 564 store.storeEmptyFile(key + FOLDER_SUFFIX); 565 } 566 } 567 } 568 569 570 @Override 571 public boolean rename(Path src, Path dst) throws IOException { 572 573 String srcKey = pathToKey(makeAbsolute(src)); 574 575 if (srcKey.length() == 0) { 576 // Cannot rename root of file system 577 return false; 578 } 579 580 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 581 582 // Figure out the final destination 583 String dstKey; 584 try { 585 boolean dstIsFile = getFileStatus(dst).isFile(); 586 if (dstIsFile) { 587 if(LOG.isDebugEnabled()) { 588 LOG.debug(debugPreamble + 589 "returning false as dst is an already existing file"); 590 } 591 return false; 592 } else { 593 if(LOG.isDebugEnabled()) { 594 LOG.debug(debugPreamble + "using dst as output directory"); 595 } 596 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 597 } 598 } catch (FileNotFoundException e) { 599 if(LOG.isDebugEnabled()) { 600 LOG.debug(debugPreamble + "using dst as output destination"); 601 } 602 dstKey = pathToKey(makeAbsolute(dst)); 603 try { 604 if (getFileStatus(dst.getParent()).isFile()) { 605 if(LOG.isDebugEnabled()) { 606 LOG.debug(debugPreamble + 607 "returning false as dst parent exists and is a file"); 608 } 609 return false; 610 } 611 } catch (FileNotFoundException ex) { 612 if(LOG.isDebugEnabled()) { 613 LOG.debug(debugPreamble + 614 "returning false as dst parent does not exist"); 615 } 616 return false; 617 } 618 } 619 620 boolean srcIsFile; 621 try { 622 srcIsFile = getFileStatus(src).isFile(); 623 } catch (FileNotFoundException e) { 624 if(LOG.isDebugEnabled()) { 625 LOG.debug(debugPreamble + "returning false as src does not exist"); 626 } 627 return false; 628 } 629 if (srcIsFile) { 630 if(LOG.isDebugEnabled()) { 631 LOG.debug(debugPreamble + 632 "src is file, so doing copy then delete in S3"); 633 } 634 store.copy(srcKey, dstKey); 635 store.delete(srcKey); 636 } else { 637 if(LOG.isDebugEnabled()) { 638 LOG.debug(debugPreamble + "src is directory, so copying contents"); 639 } 640 store.storeEmptyFile(dstKey + FOLDER_SUFFIX); 641 642 List<String> keysToDelete = new ArrayList<String>(); 643 String priorLastKey = null; 644 do { 645 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true); 646 for (FileMetadata file : listing.getFiles()) { 647 keysToDelete.add(file.getKey()); 648 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length())); 649 } 650 priorLastKey = listing.getPriorLastKey(); 651 } while (priorLastKey != null); 652 653 if(LOG.isDebugEnabled()) { 654 LOG.debug(debugPreamble + 655 "all files in src copied, now removing src files"); 656 } 657 for (String key: keysToDelete) { 658 store.delete(key); 659 } 660 661 try { 662 store.delete(srcKey + FOLDER_SUFFIX); 663 } catch (FileNotFoundException e) { 664 //this is fine, we don't require a marker 665 } 666 if(LOG.isDebugEnabled()) { 667 LOG.debug(debugPreamble + "done"); 668 } 669 } 670 671 return true; 672 } 673 674 @Override 675 public long getDefaultBlockSize() { 676 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024); 677 } 678 679 /** 680 * Set the working directory to the given directory. 681 */ 682 @Override 683 public void setWorkingDirectory(Path newDir) { 684 workingDir = newDir; 685 } 686 687 @Override 688 public Path getWorkingDirectory() { 689 return workingDir; 690 } 691}