001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3native;
020
021 import java.io.BufferedOutputStream;
022 import java.io.File;
023 import java.io.FileNotFoundException;
024 import java.io.FileOutputStream;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.OutputStream;
028 import java.net.URI;
029 import java.security.DigestOutputStream;
030 import java.security.MessageDigest;
031 import java.security.NoSuchAlgorithmException;
032 import java.util.ArrayList;
033 import java.util.HashMap;
034 import java.util.List;
035 import java.util.Map;
036 import java.util.Set;
037 import java.util.TreeSet;
038 import java.util.concurrent.TimeUnit;
039
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042 import org.apache.hadoop.classification.InterfaceAudience;
043 import org.apache.hadoop.classification.InterfaceStability;
044 import org.apache.hadoop.conf.Configuration;
045 import org.apache.hadoop.fs.BufferedFSInputStream;
046 import org.apache.hadoop.fs.FSDataInputStream;
047 import org.apache.hadoop.fs.FSDataOutputStream;
048 import org.apache.hadoop.fs.FSInputStream;
049 import org.apache.hadoop.fs.FileStatus;
050 import org.apache.hadoop.fs.FileSystem;
051 import org.apache.hadoop.fs.Path;
052 import org.apache.hadoop.fs.permission.FsPermission;
053 import org.apache.hadoop.fs.s3.S3Exception;
054 import org.apache.hadoop.io.retry.RetryPolicies;
055 import org.apache.hadoop.io.retry.RetryPolicy;
056 import org.apache.hadoop.io.retry.RetryProxy;
057 import org.apache.hadoop.util.Progressable;
058
059 /**
060 * <p>
061 * A {@link FileSystem} for reading and writing files stored on
062 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
063 * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
064 * stores files on S3 in their
065 * native form so they can be read by other S3 tools.
066 *
067 * A note about directories. S3 of course has no "native" support for them.
068 * The idiom we choose then is: for any directory created by this class,
069 * we use an empty object "#{dirpath}_$folder$" as a marker.
070 * Further, to interoperate with other S3 tools, we also accept the following:
071 * - an object "#{dirpath}/' denoting a directory marker
072 * - if there exists any objects with the prefix "#{dirpath}/", then the
073 * directory is said to exist
074 * - if both a file with the name of a directory and a marker for that
075 * directory exists, then the *file masks the directory*, and the directory
076 * is never returned.
077 * </p>
078 * @see org.apache.hadoop.fs.s3.S3FileSystem
079 */
080 @InterfaceAudience.Public
081 @InterfaceStability.Stable
082 public class NativeS3FileSystem extends FileSystem {
083
084 public static final Log LOG =
085 LogFactory.getLog(NativeS3FileSystem.class);
086
087 private static final String FOLDER_SUFFIX = "_$folder$";
088 static final String PATH_DELIMITER = Path.SEPARATOR;
089 private static final int S3_MAX_LISTING_LENGTH = 1000;
090
091 static class NativeS3FsInputStream extends FSInputStream {
092
093 private NativeFileSystemStore store;
094 private Statistics statistics;
095 private InputStream in;
096 private final String key;
097 private long pos = 0;
098
099 public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
100 this.store = store;
101 this.statistics = statistics;
102 this.in = in;
103 this.key = key;
104 }
105
106 @Override
107 public synchronized int read() throws IOException {
108 int result = -1;
109 try {
110 result = in.read();
111 } catch (IOException e) {
112 LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
113 seek(pos);
114 result = in.read();
115 }
116 if (result != -1) {
117 pos++;
118 }
119 if (statistics != null && result != -1) {
120 statistics.incrementBytesRead(1);
121 }
122 return result;
123 }
124 @Override
125 public synchronized int read(byte[] b, int off, int len)
126 throws IOException {
127
128 int result = -1;
129 try {
130 result = in.read(b, off, len);
131 } catch (IOException e) {
132 LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
133 seek(pos);
134 result = in.read(b, off, len);
135 }
136 if (result > 0) {
137 pos += result;
138 }
139 if (statistics != null && result > 0) {
140 statistics.incrementBytesRead(result);
141 }
142 return result;
143 }
144
145 @Override
146 public void close() throws IOException {
147 in.close();
148 }
149
150 @Override
151 public synchronized void seek(long pos) throws IOException {
152 in.close();
153 LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
154 in = store.retrieve(key, pos);
155 this.pos = pos;
156 }
157 @Override
158 public synchronized long getPos() throws IOException {
159 return pos;
160 }
161 @Override
162 public boolean seekToNewSource(long targetPos) throws IOException {
163 return false;
164 }
165 }
166
167 private class NativeS3FsOutputStream extends OutputStream {
168
169 private Configuration conf;
170 private String key;
171 private File backupFile;
172 private OutputStream backupStream;
173 private MessageDigest digest;
174 private boolean closed;
175
176 public NativeS3FsOutputStream(Configuration conf,
177 NativeFileSystemStore store, String key, Progressable progress,
178 int bufferSize) throws IOException {
179 this.conf = conf;
180 this.key = key;
181 this.backupFile = newBackupFile();
182 LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
183 try {
184 this.digest = MessageDigest.getInstance("MD5");
185 this.backupStream = new BufferedOutputStream(new DigestOutputStream(
186 new FileOutputStream(backupFile), this.digest));
187 } catch (NoSuchAlgorithmException e) {
188 LOG.warn("Cannot load MD5 digest algorithm," +
189 "skipping message integrity check.", e);
190 this.backupStream = new BufferedOutputStream(
191 new FileOutputStream(backupFile));
192 }
193 }
194
195 private File newBackupFile() throws IOException {
196 File dir = new File(conf.get("fs.s3.buffer.dir"));
197 if (!dir.mkdirs() && !dir.exists()) {
198 throw new IOException("Cannot create S3 buffer directory: " + dir);
199 }
200 File result = File.createTempFile("output-", ".tmp", dir);
201 result.deleteOnExit();
202 return result;
203 }
204
205 @Override
206 public void flush() throws IOException {
207 backupStream.flush();
208 }
209
210 @Override
211 public synchronized void close() throws IOException {
212 if (closed) {
213 return;
214 }
215
216 backupStream.close();
217 LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
218
219 try {
220 byte[] md5Hash = digest == null ? null : digest.digest();
221 store.storeFile(key, backupFile, md5Hash);
222 } finally {
223 if (!backupFile.delete()) {
224 LOG.warn("Could not delete temporary s3n file: " + backupFile);
225 }
226 super.close();
227 closed = true;
228 }
229 LOG.info("OutputStream for key '" + key + "' upload complete");
230 }
231
232 @Override
233 public void write(int b) throws IOException {
234 backupStream.write(b);
235 }
236
237 @Override
238 public void write(byte[] b, int off, int len) throws IOException {
239 backupStream.write(b, off, len);
240 }
241 }
242
243 private URI uri;
244 private NativeFileSystemStore store;
245 private Path workingDir;
246
247 public NativeS3FileSystem() {
248 // set store in initialize()
249 }
250
251 public NativeS3FileSystem(NativeFileSystemStore store) {
252 this.store = store;
253 }
254
255 /**
256 * Return the protocol scheme for the FileSystem.
257 * <p/>
258 *
259 * @return <code>s3n</code>
260 */
261 @Override
262 public String getScheme() {
263 return "s3n";
264 }
265
266 @Override
267 public void initialize(URI uri, Configuration conf) throws IOException {
268 super.initialize(uri, conf);
269 if (store == null) {
270 store = createDefaultStore(conf);
271 }
272 store.initialize(uri, conf);
273 setConf(conf);
274 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
275 this.workingDir =
276 new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
277 }
278
279 private static NativeFileSystemStore createDefaultStore(Configuration conf) {
280 NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
281
282 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
283 conf.getInt("fs.s3.maxRetries", 4),
284 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
285 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
286 new HashMap<Class<? extends Exception>, RetryPolicy>();
287 exceptionToPolicyMap.put(IOException.class, basePolicy);
288 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
289
290 RetryPolicy methodPolicy = RetryPolicies.retryByException(
291 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
292 Map<String, RetryPolicy> methodNameToPolicyMap =
293 new HashMap<String, RetryPolicy>();
294 methodNameToPolicyMap.put("storeFile", methodPolicy);
295 methodNameToPolicyMap.put("rename", methodPolicy);
296
297 return (NativeFileSystemStore)
298 RetryProxy.create(NativeFileSystemStore.class, store,
299 methodNameToPolicyMap);
300 }
301
302 private static String pathToKey(Path path) {
303 if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
304 // allow uris without trailing slash after bucket to refer to root,
305 // like s3n://mybucket
306 return "";
307 }
308 if (!path.isAbsolute()) {
309 throw new IllegalArgumentException("Path must be absolute: " + path);
310 }
311 String ret = path.toUri().getPath().substring(1); // remove initial slash
312 if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
313 ret = ret.substring(0, ret.length() -1);
314 }
315 return ret;
316 }
317
318 private static Path keyToPath(String key) {
319 return new Path("/" + key);
320 }
321
322 private Path makeAbsolute(Path path) {
323 if (path.isAbsolute()) {
324 return path;
325 }
326 return new Path(workingDir, path);
327 }
328
329 /** This optional operation is not yet supported. */
330 @Override
331 public FSDataOutputStream append(Path f, int bufferSize,
332 Progressable progress) throws IOException {
333 throw new IOException("Not supported");
334 }
335
336 @Override
337 public FSDataOutputStream create(Path f, FsPermission permission,
338 boolean overwrite, int bufferSize, short replication, long blockSize,
339 Progressable progress) throws IOException {
340
341 if (exists(f) && !overwrite) {
342 throw new IOException("File already exists:"+f);
343 }
344
345 if(LOG.isDebugEnabled()) {
346 LOG.debug("Creating new file '" + f + "' in S3");
347 }
348 Path absolutePath = makeAbsolute(f);
349 String key = pathToKey(absolutePath);
350 return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
351 key, progress, bufferSize), statistics);
352 }
353
354 @Override
355 public boolean delete(Path f, boolean recurse) throws IOException {
356 FileStatus status;
357 try {
358 status = getFileStatus(f);
359 } catch (FileNotFoundException e) {
360 if(LOG.isDebugEnabled()) {
361 LOG.debug("Delete called for '" + f +
362 "' but file does not exist, so returning false");
363 }
364 return false;
365 }
366 Path absolutePath = makeAbsolute(f);
367 String key = pathToKey(absolutePath);
368 if (status.isDirectory()) {
369 if (!recurse && listStatus(f).length > 0) {
370 throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
371 }
372
373 createParent(f);
374
375 if(LOG.isDebugEnabled()) {
376 LOG.debug("Deleting directory '" + f + "'");
377 }
378 String priorLastKey = null;
379 do {
380 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
381 for (FileMetadata file : listing.getFiles()) {
382 store.delete(file.getKey());
383 }
384 priorLastKey = listing.getPriorLastKey();
385 } while (priorLastKey != null);
386
387 try {
388 store.delete(key + FOLDER_SUFFIX);
389 } catch (FileNotFoundException e) {
390 //this is fine, we don't require a marker
391 }
392 } else {
393 if(LOG.isDebugEnabled()) {
394 LOG.debug("Deleting file '" + f + "'");
395 }
396 createParent(f);
397 store.delete(key);
398 }
399 return true;
400 }
401
402 @Override
403 public FileStatus getFileStatus(Path f) throws IOException {
404 Path absolutePath = makeAbsolute(f);
405 String key = pathToKey(absolutePath);
406
407 if (key.length() == 0) { // root always exists
408 return newDirectory(absolutePath);
409 }
410
411 if(LOG.isDebugEnabled()) {
412 LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
413 }
414 FileMetadata meta = store.retrieveMetadata(key);
415 if (meta != null) {
416 if(LOG.isDebugEnabled()) {
417 LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
418 }
419 return newFile(meta, absolutePath);
420 }
421 if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
422 if(LOG.isDebugEnabled()) {
423 LOG.debug("getFileStatus returning 'directory' for key '" + key +
424 "' as '" + key + FOLDER_SUFFIX + "' exists");
425 }
426 return newDirectory(absolutePath);
427 }
428
429 if(LOG.isDebugEnabled()) {
430 LOG.debug("getFileStatus listing key '" + key + "'");
431 }
432 PartialListing listing = store.list(key, 1);
433 if (listing.getFiles().length > 0 ||
434 listing.getCommonPrefixes().length > 0) {
435 if(LOG.isDebugEnabled()) {
436 LOG.debug("getFileStatus returning 'directory' for key '" + key +
437 "' as it has contents");
438 }
439 return newDirectory(absolutePath);
440 }
441
442 if(LOG.isDebugEnabled()) {
443 LOG.debug("getFileStatus could not find key '" + key + "'");
444 }
445 throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
446 }
447
448 @Override
449 public URI getUri() {
450 return uri;
451 }
452
453 /**
454 * <p>
455 * If <code>f</code> is a file, this method will make a single call to S3.
456 * If <code>f</code> is a directory, this method will make a maximum of
457 * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
458 * files and directories contained directly in <code>f</code>.
459 * </p>
460 */
461 @Override
462 public FileStatus[] listStatus(Path f) throws IOException {
463
464 Path absolutePath = makeAbsolute(f);
465 String key = pathToKey(absolutePath);
466
467 if (key.length() > 0) {
468 FileMetadata meta = store.retrieveMetadata(key);
469 if (meta != null) {
470 return new FileStatus[] { newFile(meta, absolutePath) };
471 }
472 }
473
474 URI pathUri = absolutePath.toUri();
475 Set<FileStatus> status = new TreeSet<FileStatus>();
476 String priorLastKey = null;
477 do {
478 PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
479 for (FileMetadata fileMetadata : listing.getFiles()) {
480 Path subpath = keyToPath(fileMetadata.getKey());
481 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
482
483 if (fileMetadata.getKey().equals(key + "/")) {
484 // this is just the directory we have been asked to list
485 }
486 else if (relativePath.endsWith(FOLDER_SUFFIX)) {
487 status.add(newDirectory(new Path(
488 absolutePath,
489 relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
490 }
491 else {
492 status.add(newFile(fileMetadata, subpath));
493 }
494 }
495 for (String commonPrefix : listing.getCommonPrefixes()) {
496 Path subpath = keyToPath(commonPrefix);
497 String relativePath = pathUri.relativize(subpath.toUri()).getPath();
498 status.add(newDirectory(new Path(absolutePath, relativePath)));
499 }
500 priorLastKey = listing.getPriorLastKey();
501 } while (priorLastKey != null);
502
503 if (status.isEmpty() &&
504 key.length() > 0 &&
505 store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
506 throw new FileNotFoundException("File " + f + " does not exist.");
507 }
508
509 return status.toArray(new FileStatus[status.size()]);
510 }
511
512 private FileStatus newFile(FileMetadata meta, Path path) {
513 return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
514 meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
515 }
516
517 private FileStatus newDirectory(Path path) {
518 return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
519 }
520
521 @Override
522 public boolean mkdirs(Path f, FsPermission permission) throws IOException {
523 Path absolutePath = makeAbsolute(f);
524 List<Path> paths = new ArrayList<Path>();
525 do {
526 paths.add(0, absolutePath);
527 absolutePath = absolutePath.getParent();
528 } while (absolutePath != null);
529
530 boolean result = true;
531 for (Path path : paths) {
532 result &= mkdir(path);
533 }
534 return result;
535 }
536
537 private boolean mkdir(Path f) throws IOException {
538 try {
539 FileStatus fileStatus = getFileStatus(f);
540 if (fileStatus.isFile()) {
541 throw new IOException(String.format(
542 "Can't make directory for path '%s' since it is a file.", f));
543
544 }
545 } catch (FileNotFoundException e) {
546 if(LOG.isDebugEnabled()) {
547 LOG.debug("Making dir '" + f + "' in S3");
548 }
549 String key = pathToKey(f) + FOLDER_SUFFIX;
550 store.storeEmptyFile(key);
551 }
552 return true;
553 }
554
555 @Override
556 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
557 FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
558 if (fs.isDirectory()) {
559 throw new IOException("'" + f + "' is a directory");
560 }
561 LOG.info("Opening '" + f + "' for reading");
562 Path absolutePath = makeAbsolute(f);
563 String key = pathToKey(absolutePath);
564 return new FSDataInputStream(new BufferedFSInputStream(
565 new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
566 }
567
568 // rename() and delete() use this method to ensure that the parent directory
569 // of the source does not vanish.
570 private void createParent(Path path) throws IOException {
571 Path parent = path.getParent();
572 if (parent != null) {
573 String key = pathToKey(makeAbsolute(parent));
574 if (key.length() > 0) {
575 store.storeEmptyFile(key + FOLDER_SUFFIX);
576 }
577 }
578 }
579
580
581 @Override
582 public boolean rename(Path src, Path dst) throws IOException {
583
584 String srcKey = pathToKey(makeAbsolute(src));
585
586 if (srcKey.length() == 0) {
587 // Cannot rename root of file system
588 return false;
589 }
590
591 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
592
593 // Figure out the final destination
594 String dstKey;
595 try {
596 boolean dstIsFile = getFileStatus(dst).isFile();
597 if (dstIsFile) {
598 if(LOG.isDebugEnabled()) {
599 LOG.debug(debugPreamble +
600 "returning false as dst is an already existing file");
601 }
602 return false;
603 } else {
604 if(LOG.isDebugEnabled()) {
605 LOG.debug(debugPreamble + "using dst as output directory");
606 }
607 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
608 }
609 } catch (FileNotFoundException e) {
610 if(LOG.isDebugEnabled()) {
611 LOG.debug(debugPreamble + "using dst as output destination");
612 }
613 dstKey = pathToKey(makeAbsolute(dst));
614 try {
615 if (getFileStatus(dst.getParent()).isFile()) {
616 if(LOG.isDebugEnabled()) {
617 LOG.debug(debugPreamble +
618 "returning false as dst parent exists and is a file");
619 }
620 return false;
621 }
622 } catch (FileNotFoundException ex) {
623 if(LOG.isDebugEnabled()) {
624 LOG.debug(debugPreamble +
625 "returning false as dst parent does not exist");
626 }
627 return false;
628 }
629 }
630
631 boolean srcIsFile;
632 try {
633 srcIsFile = getFileStatus(src).isFile();
634 } catch (FileNotFoundException e) {
635 if(LOG.isDebugEnabled()) {
636 LOG.debug(debugPreamble + "returning false as src does not exist");
637 }
638 return false;
639 }
640 if (srcIsFile) {
641 if(LOG.isDebugEnabled()) {
642 LOG.debug(debugPreamble +
643 "src is file, so doing copy then delete in S3");
644 }
645 store.copy(srcKey, dstKey);
646 store.delete(srcKey);
647 } else {
648 if(LOG.isDebugEnabled()) {
649 LOG.debug(debugPreamble + "src is directory, so copying contents");
650 }
651 store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
652
653 List<String> keysToDelete = new ArrayList<String>();
654 String priorLastKey = null;
655 do {
656 PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
657 for (FileMetadata file : listing.getFiles()) {
658 keysToDelete.add(file.getKey());
659 store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
660 }
661 priorLastKey = listing.getPriorLastKey();
662 } while (priorLastKey != null);
663
664 if(LOG.isDebugEnabled()) {
665 LOG.debug(debugPreamble +
666 "all files in src copied, now removing src files");
667 }
668 for (String key: keysToDelete) {
669 store.delete(key);
670 }
671
672 try {
673 store.delete(srcKey + FOLDER_SUFFIX);
674 } catch (FileNotFoundException e) {
675 //this is fine, we don't require a marker
676 }
677 if(LOG.isDebugEnabled()) {
678 LOG.debug(debugPreamble + "done");
679 }
680 }
681
682 return true;
683 }
684
685 @Override
686 public long getDefaultBlockSize() {
687 return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
688 }
689
690 /**
691 * Set the working directory to the given directory.
692 */
693 @Override
694 public void setWorkingDirectory(Path newDir) {
695 workingDir = newDir;
696 }
697
698 @Override
699 public Path getWorkingDirectory() {
700 return workingDir;
701 }
702
703 @Override
704 public String getCanonicalServiceName() {
705 // Does not support Token
706 return null;
707 }
708 }