001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileAlreadyExistsException; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 041import org.apache.hadoop.io.retry.RetryPolicies; 042import org.apache.hadoop.io.retry.RetryPolicy; 043import org.apache.hadoop.io.retry.RetryProxy; 044import org.apache.hadoop.util.Progressable; 045 046/** 047 * A block-based {@link FileSystem} backed by 048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 049 * 050 * @see NativeS3FileSystem 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Stable 054public class S3FileSystem extends FileSystem { 055 056 private URI uri; 057 058 private FileSystemStore store; 059 060 private Path workingDir; 061 062 public S3FileSystem() { 063 // set store in initialize() 064 } 065 066 public S3FileSystem(FileSystemStore store) { 067 this.store = store; 068 } 069 070 /** 071 * Return the protocol scheme for the FileSystem. 072 * 073 * @return <code>s3</code> 074 */ 075 @Override 076 public String getScheme() { 077 return "s3"; 078 } 079 080 @Override 081 public URI getUri() { 082 return uri; 083 } 084 085 @Override 086 public void initialize(URI uri, Configuration conf) throws IOException { 087 super.initialize(uri, conf); 088 if (store == null) { 089 store = createDefaultStore(conf); 090 } 091 store.initialize(uri, conf); 092 setConf(conf); 093 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 094 this.workingDir = 095 new Path("/user", System.getProperty("user.name")).makeQualified(this); 096 } 097 098 private static FileSystemStore createDefaultStore(Configuration conf) { 099 FileSystemStore store = new Jets3tFileSystemStore(); 100 101 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 102 conf.getInt("fs.s3.maxRetries", 4), 103 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 104 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 105 new HashMap<Class<? extends Exception>, RetryPolicy>(); 106 exceptionToPolicyMap.put(IOException.class, basePolicy); 107 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 108 109 RetryPolicy methodPolicy = RetryPolicies.retryByException( 110 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 111 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 112 methodNameToPolicyMap.put("storeBlock", methodPolicy); 113 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 114 115 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 116 store, methodNameToPolicyMap); 117 } 118 119 @Override 120 public Path getWorkingDirectory() { 121 return workingDir; 122 } 123 124 @Override 125 public void setWorkingDirectory(Path dir) { 126 workingDir = makeAbsolute(dir); 127 } 128 129 private Path makeAbsolute(Path path) { 130 if (path.isAbsolute()) { 131 return path; 132 } 133 return new Path(workingDir, path); 134 } 135 136 /** 137 * @param permission Currently ignored. 138 */ 139 @Override 140 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 141 Path absolutePath = makeAbsolute(path); 142 List<Path> paths = new ArrayList<Path>(); 143 do { 144 paths.add(0, absolutePath); 145 absolutePath = absolutePath.getParent(); 146 } while (absolutePath != null); 147 148 boolean result = true; 149 for (Path p : paths) { 150 result &= mkdir(p); 151 } 152 return result; 153 } 154 155 private boolean mkdir(Path path) throws IOException { 156 Path absolutePath = makeAbsolute(path); 157 INode inode = store.retrieveINode(absolutePath); 158 if (inode == null) { 159 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 160 } else if (inode.isFile()) { 161 throw new IOException(String.format( 162 "Can't make directory for path %s since it is a file.", 163 absolutePath)); 164 } 165 return true; 166 } 167 168 @Override 169 public boolean isFile(Path path) throws IOException { 170 INode inode = store.retrieveINode(makeAbsolute(path)); 171 if (inode == null) { 172 return false; 173 } 174 return inode.isFile(); 175 } 176 177 private INode checkFile(Path path) throws IOException { 178 INode inode = store.retrieveINode(makeAbsolute(path)); 179 if (inode == null) { 180 throw new IOException("No such file."); 181 } 182 if (inode.isDirectory()) { 183 throw new IOException("Path " + path + " is a directory."); 184 } 185 return inode; 186 } 187 188 @Override 189 public FileStatus[] listStatus(Path f) throws IOException { 190 Path absolutePath = makeAbsolute(f); 191 INode inode = store.retrieveINode(absolutePath); 192 if (inode == null) { 193 throw new FileNotFoundException("File " + f + " does not exist."); 194 } 195 if (inode.isFile()) { 196 return new FileStatus[] { 197 new S3FileStatus(f.makeQualified(this), inode) 198 }; 199 } 200 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 201 for (Path p : store.listSubPaths(absolutePath)) { 202 ret.add(getFileStatus(p.makeQualified(this))); 203 } 204 return ret.toArray(new FileStatus[0]); 205 } 206 207 /** This optional operation is not yet supported. */ 208 @Override 209 public FSDataOutputStream append(Path f, int bufferSize, 210 Progressable progress) throws IOException { 211 throw new IOException("Not supported"); 212 } 213 214 /** 215 * @param permission Currently ignored. 216 */ 217 @Override 218 public FSDataOutputStream create(Path file, FsPermission permission, 219 boolean overwrite, int bufferSize, 220 short replication, long blockSize, Progressable progress) 221 throws IOException { 222 223 INode inode = store.retrieveINode(makeAbsolute(file)); 224 if (inode != null) { 225 if (overwrite) { 226 delete(file, true); 227 } else { 228 throw new FileAlreadyExistsException("File already exists: " + file); 229 } 230 } else { 231 Path parent = file.getParent(); 232 if (parent != null) { 233 if (!mkdirs(parent)) { 234 throw new IOException("Mkdirs failed to create " + parent.toString()); 235 } 236 } 237 } 238 return new FSDataOutputStream 239 (new S3OutputStream(getConf(), store, makeAbsolute(file), 240 blockSize, progress, bufferSize), 241 statistics); 242 } 243 244 @Override 245 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 246 INode inode = checkFile(path); 247 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 248 statistics)); 249 } 250 251 @Override 252 public boolean rename(Path src, Path dst) throws IOException { 253 Path absoluteSrc = makeAbsolute(src); 254 INode srcINode = store.retrieveINode(absoluteSrc); 255 if (srcINode == null) { 256 // src path doesn't exist 257 return false; 258 } 259 Path absoluteDst = makeAbsolute(dst); 260 INode dstINode = store.retrieveINode(absoluteDst); 261 if (dstINode != null && dstINode.isDirectory()) { 262 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 263 dstINode = store.retrieveINode(absoluteDst); 264 } 265 if (dstINode != null) { 266 // dst path already exists - can't overwrite 267 return false; 268 } 269 Path dstParent = absoluteDst.getParent(); 270 if (dstParent != null) { 271 INode dstParentINode = store.retrieveINode(dstParent); 272 if (dstParentINode == null || dstParentINode.isFile()) { 273 // dst parent doesn't exist or is a file 274 return false; 275 } 276 } 277 return renameRecursive(absoluteSrc, absoluteDst); 278 } 279 280 private boolean renameRecursive(Path src, Path dst) throws IOException { 281 INode srcINode = store.retrieveINode(src); 282 store.storeINode(dst, srcINode); 283 store.deleteINode(src); 284 if (srcINode.isDirectory()) { 285 for (Path oldSrc : store.listDeepSubPaths(src)) { 286 INode inode = store.retrieveINode(oldSrc); 287 if (inode == null) { 288 return false; 289 } 290 String oldSrcPath = oldSrc.toUri().getPath(); 291 String srcPath = src.toUri().getPath(); 292 String dstPath = dst.toUri().getPath(); 293 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 294 store.storeINode(newDst, inode); 295 store.deleteINode(oldSrc); 296 } 297 } 298 return true; 299 } 300 301 @Override 302 public boolean delete(Path path, boolean recursive) throws IOException { 303 Path absolutePath = makeAbsolute(path); 304 INode inode = store.retrieveINode(absolutePath); 305 if (inode == null) { 306 return false; 307 } 308 if (inode.isFile()) { 309 store.deleteINode(absolutePath); 310 for (Block block: inode.getBlocks()) { 311 store.deleteBlock(block); 312 } 313 } else { 314 FileStatus[] contents = null; 315 try { 316 contents = listStatus(absolutePath); 317 } catch(FileNotFoundException fnfe) { 318 return false; 319 } 320 321 if ((contents.length !=0) && (!recursive)) { 322 throw new IOException("Directory " + path.toString() 323 + " is not empty."); 324 } 325 for (FileStatus p:contents) { 326 if (!delete(p.getPath(), recursive)) { 327 return false; 328 } 329 } 330 store.deleteINode(absolutePath); 331 } 332 return true; 333 } 334 335 /** 336 * FileStatus for S3 file systems. 337 */ 338 @Override 339 public FileStatus getFileStatus(Path f) throws IOException { 340 INode inode = store.retrieveINode(makeAbsolute(f)); 341 if (inode == null) { 342 throw new FileNotFoundException(f + ": No such file or directory."); 343 } 344 return new S3FileStatus(f.makeQualified(this), inode); 345 } 346 347 @Override 348 public long getDefaultBlockSize() { 349 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 350 } 351 352 @Override 353 public String getCanonicalServiceName() { 354 // Does not support Token 355 return null; 356 } 357 358 // diagnostic methods 359 360 void dump() throws IOException { 361 store.dump(); 362 } 363 364 void purge() throws IOException { 365 store.purge(); 366 } 367 368 private static class S3FileStatus extends FileStatus { 369 370 S3FileStatus(Path f, INode inode) throws IOException { 371 super(findLength(inode), inode.isDirectory(), 1, 372 findBlocksize(inode), 0, f); 373 } 374 375 private static long findLength(INode inode) { 376 if (!inode.isDirectory()) { 377 long length = 0L; 378 for (Block block : inode.getBlocks()) { 379 length += block.getLength(); 380 } 381 return length; 382 } 383 return 0; 384 } 385 386 private static long findBlocksize(INode inode) { 387 final Block[] ret = inode.getBlocks(); 388 return ret == null ? 0L : ret[0].getLength(); 389 } 390 } 391}