001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs.s3native;
020    
021    import java.io.BufferedOutputStream;
022    import java.io.File;
023    import java.io.FileNotFoundException;
024    import java.io.FileOutputStream;
025    import java.io.IOException;
026    import java.io.InputStream;
027    import java.io.OutputStream;
028    import java.net.URI;
029    import java.security.DigestOutputStream;
030    import java.security.MessageDigest;
031    import java.security.NoSuchAlgorithmException;
032    import java.util.ArrayList;
033    import java.util.HashMap;
034    import java.util.List;
035    import java.util.Map;
036    import java.util.Set;
037    import java.util.TreeSet;
038    import java.util.concurrent.TimeUnit;
039    
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    import org.apache.hadoop.classification.InterfaceAudience;
043    import org.apache.hadoop.classification.InterfaceStability;
044    import org.apache.hadoop.conf.Configuration;
045    import org.apache.hadoop.fs.BufferedFSInputStream;
046    import org.apache.hadoop.fs.FSDataInputStream;
047    import org.apache.hadoop.fs.FSDataOutputStream;
048    import org.apache.hadoop.fs.FSInputStream;
049    import org.apache.hadoop.fs.FileStatus;
050    import org.apache.hadoop.fs.FileSystem;
051    import org.apache.hadoop.fs.Path;
052    import org.apache.hadoop.fs.permission.FsPermission;
053    import org.apache.hadoop.fs.s3.S3Exception;
054    import org.apache.hadoop.io.retry.RetryPolicies;
055    import org.apache.hadoop.io.retry.RetryPolicy;
056    import org.apache.hadoop.io.retry.RetryProxy;
057    import org.apache.hadoop.util.Progressable;
058    
059    /**
060     * <p>
061     * A {@link FileSystem} for reading and writing files stored on
062     * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
063     * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
064     * stores files on S3 in their
065     * native form so they can be read by other S3 tools.
066     *
067     * A note about directories. S3 of course has no "native" support for them.
068     * The idiom we choose then is: for any directory created by this class,
069     * we use an empty object "#{dirpath}_$folder$" as a marker.
070     * Further, to interoperate with other S3 tools, we also accept the following:
071     *  - an object "#{dirpath}/' denoting a directory marker
072     *  - if there exists any objects with the prefix "#{dirpath}/", then the
073     *    directory is said to exist
074     *  - if both a file with the name of a directory and a marker for that
075     *    directory exists, then the *file masks the directory*, and the directory
076     *    is never returned.
077     * </p>
078     * @see org.apache.hadoop.fs.s3.S3FileSystem
079     */
080    @InterfaceAudience.Public
081    @InterfaceStability.Stable
082    public class NativeS3FileSystem extends FileSystem {
083      
084      public static final Log LOG = 
085        LogFactory.getLog(NativeS3FileSystem.class);
086      
087      private static final String FOLDER_SUFFIX = "_$folder$";
088      static final String PATH_DELIMITER = Path.SEPARATOR;
089      private static final int S3_MAX_LISTING_LENGTH = 1000;
090      
091      static class NativeS3FsInputStream extends FSInputStream {
092        
093        private NativeFileSystemStore store;
094        private Statistics statistics;
095        private InputStream in;
096        private final String key;
097        private long pos = 0;
098        
099        public NativeS3FsInputStream(NativeFileSystemStore store, Statistics statistics, InputStream in, String key) {
100          this.store = store;
101          this.statistics = statistics;
102          this.in = in;
103          this.key = key;
104        }
105        
106        @Override
107        public synchronized int read() throws IOException {
108          int result = -1;
109          try {
110            result = in.read();
111          } catch (IOException e) {
112            LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
113            seek(pos);
114            result = in.read();
115          } 
116          if (result != -1) {
117            pos++;
118          }
119          if (statistics != null && result != -1) {
120            statistics.incrementBytesRead(1);
121          }
122          return result;
123        }
124        @Override
125        public synchronized int read(byte[] b, int off, int len)
126          throws IOException {
127          
128          int result = -1;
129          try {
130            result = in.read(b, off, len);
131          } catch (IOException e) {
132            LOG.info("Received IOException while reading '" + key + "', attempting to reopen.");
133            seek(pos);
134            result = in.read(b, off, len);
135          }
136          if (result > 0) {
137            pos += result;
138          }
139          if (statistics != null && result > 0) {
140            statistics.incrementBytesRead(result);
141          }
142          return result;
143        }
144    
145        @Override
146        public void close() throws IOException {
147          in.close();
148        }
149    
150        @Override
151        public synchronized void seek(long pos) throws IOException {
152          in.close();
153          LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
154          in = store.retrieve(key, pos);
155          this.pos = pos;
156        }
157        @Override
158        public synchronized long getPos() throws IOException {
159          return pos;
160        }
161        @Override
162        public boolean seekToNewSource(long targetPos) throws IOException {
163          return false;
164        }
165      }
166      
167      private class NativeS3FsOutputStream extends OutputStream {
168        
169        private Configuration conf;
170        private String key;
171        private File backupFile;
172        private OutputStream backupStream;
173        private MessageDigest digest;
174        private boolean closed;
175        
176        public NativeS3FsOutputStream(Configuration conf,
177            NativeFileSystemStore store, String key, Progressable progress,
178            int bufferSize) throws IOException {
179          this.conf = conf;
180          this.key = key;
181          this.backupFile = newBackupFile();
182          LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
183          try {
184            this.digest = MessageDigest.getInstance("MD5");
185            this.backupStream = new BufferedOutputStream(new DigestOutputStream(
186                new FileOutputStream(backupFile), this.digest));
187          } catch (NoSuchAlgorithmException e) {
188            LOG.warn("Cannot load MD5 digest algorithm," +
189                "skipping message integrity check.", e);
190            this.backupStream = new BufferedOutputStream(
191                new FileOutputStream(backupFile));
192          }
193        }
194    
195        private File newBackupFile() throws IOException {
196          File dir = new File(conf.get("fs.s3.buffer.dir"));
197          if (!dir.mkdirs() && !dir.exists()) {
198            throw new IOException("Cannot create S3 buffer directory: " + dir);
199          }
200          File result = File.createTempFile("output-", ".tmp", dir);
201          result.deleteOnExit();
202          return result;
203        }
204        
205        @Override
206        public void flush() throws IOException {
207          backupStream.flush();
208        }
209        
210        @Override
211        public synchronized void close() throws IOException {
212          if (closed) {
213            return;
214          }
215    
216          backupStream.close();
217          LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
218          
219          try {
220            byte[] md5Hash = digest == null ? null : digest.digest();
221            store.storeFile(key, backupFile, md5Hash);
222          } finally {
223            if (!backupFile.delete()) {
224              LOG.warn("Could not delete temporary s3n file: " + backupFile);
225            }
226            super.close();
227            closed = true;
228          } 
229          LOG.info("OutputStream for key '" + key + "' upload complete");
230        }
231    
232        @Override
233        public void write(int b) throws IOException {
234          backupStream.write(b);
235        }
236    
237        @Override
238        public void write(byte[] b, int off, int len) throws IOException {
239          backupStream.write(b, off, len);
240        }
241      }
242      
243      private URI uri;
244      private NativeFileSystemStore store;
245      private Path workingDir;
246      
247      public NativeS3FileSystem() {
248        // set store in initialize()
249      }
250      
251      public NativeS3FileSystem(NativeFileSystemStore store) {
252        this.store = store;
253      }
254    
255      /**
256       * Return the protocol scheme for the FileSystem.
257       * <p/>
258       *
259       * @return <code>s3n</code>
260       */
261      @Override
262      public String getScheme() {
263        return "s3n";
264      }
265    
266      @Override
267      public void initialize(URI uri, Configuration conf) throws IOException {
268        super.initialize(uri, conf);
269        if (store == null) {
270          store = createDefaultStore(conf);
271        }
272        store.initialize(uri, conf);
273        setConf(conf);
274        this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
275        this.workingDir =
276          new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory());
277      }
278      
279      private static NativeFileSystemStore createDefaultStore(Configuration conf) {
280        NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
281        
282        RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
283            conf.getInt("fs.s3.maxRetries", 4),
284            conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
285        Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
286          new HashMap<Class<? extends Exception>, RetryPolicy>();
287        exceptionToPolicyMap.put(IOException.class, basePolicy);
288        exceptionToPolicyMap.put(S3Exception.class, basePolicy);
289        
290        RetryPolicy methodPolicy = RetryPolicies.retryByException(
291            RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
292        Map<String, RetryPolicy> methodNameToPolicyMap =
293          new HashMap<String, RetryPolicy>();
294        methodNameToPolicyMap.put("storeFile", methodPolicy);
295        methodNameToPolicyMap.put("rename", methodPolicy);
296        
297        return (NativeFileSystemStore)
298          RetryProxy.create(NativeFileSystemStore.class, store,
299              methodNameToPolicyMap);
300      }
301      
302      private static String pathToKey(Path path) {
303        if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
304          // allow uris without trailing slash after bucket to refer to root,
305          // like s3n://mybucket
306          return "";
307        }
308        if (!path.isAbsolute()) {
309          throw new IllegalArgumentException("Path must be absolute: " + path);
310        }
311        String ret = path.toUri().getPath().substring(1); // remove initial slash
312        if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
313          ret = ret.substring(0, ret.length() -1);
314      }
315        return ret;
316      }
317      
318      private static Path keyToPath(String key) {
319        return new Path("/" + key);
320      }
321      
322      private Path makeAbsolute(Path path) {
323        if (path.isAbsolute()) {
324          return path;
325        }
326        return new Path(workingDir, path);
327      }
328    
329      /** This optional operation is not yet supported. */
330      @Override
331      public FSDataOutputStream append(Path f, int bufferSize,
332          Progressable progress) throws IOException {
333        throw new IOException("Not supported");
334      }
335      
336      @Override
337      public FSDataOutputStream create(Path f, FsPermission permission,
338          boolean overwrite, int bufferSize, short replication, long blockSize,
339          Progressable progress) throws IOException {
340    
341        if (exists(f) && !overwrite) {
342          throw new IOException("File already exists:"+f);
343        }
344        
345        if(LOG.isDebugEnabled()) {
346          LOG.debug("Creating new file '" + f + "' in S3");
347        }
348        Path absolutePath = makeAbsolute(f);
349        String key = pathToKey(absolutePath);
350        return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
351            key, progress, bufferSize), statistics);
352      }
353      
354      @Override
355      public boolean delete(Path f, boolean recurse) throws IOException {
356        FileStatus status;
357        try {
358          status = getFileStatus(f);
359        } catch (FileNotFoundException e) {
360          if(LOG.isDebugEnabled()) {
361            LOG.debug("Delete called for '" + f +
362                "' but file does not exist, so returning false");
363          }
364          return false;
365        }
366        Path absolutePath = makeAbsolute(f);
367        String key = pathToKey(absolutePath);
368        if (status.isDirectory()) {
369          if (!recurse && listStatus(f).length > 0) {
370            throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
371          }
372    
373          createParent(f);
374    
375          if(LOG.isDebugEnabled()) {
376            LOG.debug("Deleting directory '" + f  + "'");
377          }
378          String priorLastKey = null;
379          do {
380            PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
381            for (FileMetadata file : listing.getFiles()) {
382              store.delete(file.getKey());
383            }
384            priorLastKey = listing.getPriorLastKey();
385          } while (priorLastKey != null);
386    
387          try {
388            store.delete(key + FOLDER_SUFFIX);
389          } catch (FileNotFoundException e) {
390            //this is fine, we don't require a marker
391          }
392        } else {
393          if(LOG.isDebugEnabled()) {
394            LOG.debug("Deleting file '" + f + "'");
395          }
396          createParent(f);
397          store.delete(key);
398        }
399        return true;
400      }
401    
402      @Override
403      public FileStatus getFileStatus(Path f) throws IOException {
404        Path absolutePath = makeAbsolute(f);
405        String key = pathToKey(absolutePath);
406        
407        if (key.length() == 0) { // root always exists
408          return newDirectory(absolutePath);
409        }
410        
411        if(LOG.isDebugEnabled()) {
412          LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
413        }
414        FileMetadata meta = store.retrieveMetadata(key);
415        if (meta != null) {
416          if(LOG.isDebugEnabled()) {
417            LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
418          }
419          return newFile(meta, absolutePath);
420        }
421        if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
422          if(LOG.isDebugEnabled()) {
423            LOG.debug("getFileStatus returning 'directory' for key '" + key +
424                "' as '" + key + FOLDER_SUFFIX + "' exists");
425          }
426          return newDirectory(absolutePath);
427        }
428        
429        if(LOG.isDebugEnabled()) {
430          LOG.debug("getFileStatus listing key '" + key + "'");
431        }
432        PartialListing listing = store.list(key, 1);
433        if (listing.getFiles().length > 0 ||
434            listing.getCommonPrefixes().length > 0) {
435          if(LOG.isDebugEnabled()) {
436            LOG.debug("getFileStatus returning 'directory' for key '" + key +
437                "' as it has contents");
438          }
439          return newDirectory(absolutePath);
440        }
441        
442        if(LOG.isDebugEnabled()) {
443          LOG.debug("getFileStatus could not find key '" + key + "'");
444        }
445        throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
446      }
447    
448      @Override
449      public URI getUri() {
450        return uri;
451      }
452    
453      /**
454       * <p>
455       * If <code>f</code> is a file, this method will make a single call to S3.
456       * If <code>f</code> is a directory, this method will make a maximum of
457       * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
458       * files and directories contained directly in <code>f</code>.
459       * </p>
460       */
461      @Override
462      public FileStatus[] listStatus(Path f) throws IOException {
463    
464        Path absolutePath = makeAbsolute(f);
465        String key = pathToKey(absolutePath);
466        
467        if (key.length() > 0) {
468          FileMetadata meta = store.retrieveMetadata(key);
469          if (meta != null) {
470            return new FileStatus[] { newFile(meta, absolutePath) };
471          }
472        }
473        
474        URI pathUri = absolutePath.toUri();
475        Set<FileStatus> status = new TreeSet<FileStatus>();
476        String priorLastKey = null;
477        do {
478          PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
479          for (FileMetadata fileMetadata : listing.getFiles()) {
480            Path subpath = keyToPath(fileMetadata.getKey());
481            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
482    
483            if (fileMetadata.getKey().equals(key + "/")) {
484              // this is just the directory we have been asked to list
485            }
486            else if (relativePath.endsWith(FOLDER_SUFFIX)) {
487              status.add(newDirectory(new Path(
488                  absolutePath,
489                  relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
490            }
491            else {
492              status.add(newFile(fileMetadata, subpath));
493            }
494          }
495          for (String commonPrefix : listing.getCommonPrefixes()) {
496            Path subpath = keyToPath(commonPrefix);
497            String relativePath = pathUri.relativize(subpath.toUri()).getPath();
498            status.add(newDirectory(new Path(absolutePath, relativePath)));
499          }
500          priorLastKey = listing.getPriorLastKey();
501        } while (priorLastKey != null);
502        
503        if (status.isEmpty() &&
504            key.length() > 0 &&
505            store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
506          throw new FileNotFoundException("File " + f + " does not exist.");
507        }
508        
509        return status.toArray(new FileStatus[status.size()]);
510      }
511      
512      private FileStatus newFile(FileMetadata meta, Path path) {
513        return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
514            meta.getLastModified(), path.makeQualified(this.getUri(), this.getWorkingDirectory()));
515      }
516      
517      private FileStatus newDirectory(Path path) {
518        return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this.getUri(), this.getWorkingDirectory()));
519      }
520    
521      @Override
522      public boolean mkdirs(Path f, FsPermission permission) throws IOException {
523        Path absolutePath = makeAbsolute(f);
524        List<Path> paths = new ArrayList<Path>();
525        do {
526          paths.add(0, absolutePath);
527          absolutePath = absolutePath.getParent();
528        } while (absolutePath != null);
529        
530        boolean result = true;
531        for (Path path : paths) {
532          result &= mkdir(path);
533        }
534        return result;
535      }
536      
537      private boolean mkdir(Path f) throws IOException {
538        try {
539          FileStatus fileStatus = getFileStatus(f);
540          if (fileStatus.isFile()) {
541            throw new IOException(String.format(
542                "Can't make directory for path '%s' since it is a file.", f));
543    
544          }
545        } catch (FileNotFoundException e) {
546          if(LOG.isDebugEnabled()) {
547            LOG.debug("Making dir '" + f + "' in S3");
548          }
549          String key = pathToKey(f) + FOLDER_SUFFIX;
550          store.storeEmptyFile(key);    
551        }
552        return true;
553      }
554    
555      @Override
556      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
557        FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
558        if (fs.isDirectory()) {
559          throw new IOException("'" + f + "' is a directory");
560        }
561        LOG.info("Opening '" + f + "' for reading");
562        Path absolutePath = makeAbsolute(f);
563        String key = pathToKey(absolutePath);
564        return new FSDataInputStream(new BufferedFSInputStream(
565            new NativeS3FsInputStream(store, statistics, store.retrieve(key), key), bufferSize));
566      }
567      
568      // rename() and delete() use this method to ensure that the parent directory
569      // of the source does not vanish.
570      private void createParent(Path path) throws IOException {
571        Path parent = path.getParent();
572        if (parent != null) {
573          String key = pathToKey(makeAbsolute(parent));
574          if (key.length() > 0) {
575              store.storeEmptyFile(key + FOLDER_SUFFIX);
576          }
577        }
578      }
579      
580        
581      @Override
582      public boolean rename(Path src, Path dst) throws IOException {
583    
584        String srcKey = pathToKey(makeAbsolute(src));
585    
586        if (srcKey.length() == 0) {
587          // Cannot rename root of file system
588          return false;
589        }
590    
591        final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
592    
593        // Figure out the final destination
594        String dstKey;
595        try {
596          boolean dstIsFile = getFileStatus(dst).isFile();
597          if (dstIsFile) {
598            if(LOG.isDebugEnabled()) {
599              LOG.debug(debugPreamble +
600                  "returning false as dst is an already existing file");
601            }
602            return false;
603          } else {
604            if(LOG.isDebugEnabled()) {
605              LOG.debug(debugPreamble + "using dst as output directory");
606            }
607            dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
608          }
609        } catch (FileNotFoundException e) {
610          if(LOG.isDebugEnabled()) {
611            LOG.debug(debugPreamble + "using dst as output destination");
612          }
613          dstKey = pathToKey(makeAbsolute(dst));
614          try {
615            if (getFileStatus(dst.getParent()).isFile()) {
616              if(LOG.isDebugEnabled()) {
617                LOG.debug(debugPreamble +
618                    "returning false as dst parent exists and is a file");
619              }
620              return false;
621            }
622          } catch (FileNotFoundException ex) {
623            if(LOG.isDebugEnabled()) {
624              LOG.debug(debugPreamble +
625                  "returning false as dst parent does not exist");
626            }
627            return false;
628          }
629        }
630    
631        boolean srcIsFile;
632        try {
633          srcIsFile = getFileStatus(src).isFile();
634        } catch (FileNotFoundException e) {
635          if(LOG.isDebugEnabled()) {
636            LOG.debug(debugPreamble + "returning false as src does not exist");
637          }
638          return false;
639        }
640        if (srcIsFile) {
641          if(LOG.isDebugEnabled()) {
642            LOG.debug(debugPreamble +
643                "src is file, so doing copy then delete in S3");
644          }
645          store.copy(srcKey, dstKey);
646          store.delete(srcKey);
647        } else {
648          if(LOG.isDebugEnabled()) {
649            LOG.debug(debugPreamble + "src is directory, so copying contents");
650          }
651          store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
652    
653          List<String> keysToDelete = new ArrayList<String>();
654          String priorLastKey = null;
655          do {
656            PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
657            for (FileMetadata file : listing.getFiles()) {
658              keysToDelete.add(file.getKey());
659              store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
660            }
661            priorLastKey = listing.getPriorLastKey();
662          } while (priorLastKey != null);
663    
664          if(LOG.isDebugEnabled()) {
665            LOG.debug(debugPreamble +
666                "all files in src copied, now removing src files");
667          }
668          for (String key: keysToDelete) {
669            store.delete(key);
670          }
671    
672          try {
673            store.delete(srcKey + FOLDER_SUFFIX);
674          } catch (FileNotFoundException e) {
675            //this is fine, we don't require a marker
676          }
677          if(LOG.isDebugEnabled()) {
678            LOG.debug(debugPreamble + "done");
679          }
680        }
681    
682        return true;
683      }
684      
685      @Override
686      public long getDefaultBlockSize() {
687        return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
688      }
689    
690      /**
691       * Set the working directory to the given directory.
692       */
693      @Override
694      public void setWorkingDirectory(Path newDir) {
695        workingDir = newDir;
696      }
697      
698      @Override
699      public Path getWorkingDirectory() {
700        return workingDir;
701      }
702    
703      @Override
704      public String getCanonicalServiceName() {
705        // Does not support Token
706        return null;
707      }
708    }