001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileAlreadyExistsException; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 041import org.apache.hadoop.io.retry.RetryPolicies; 042import org.apache.hadoop.io.retry.RetryPolicy; 043import org.apache.hadoop.io.retry.RetryProxy; 044import org.apache.hadoop.util.Progressable; 045 046/** 047 * <p> 048 * A block-based {@link FileSystem} backed by 049 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 050 * </p> 051 * @see NativeS3FileSystem 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Stable 055public class S3FileSystem extends FileSystem { 056 057 private URI uri; 058 059 private FileSystemStore store; 060 061 private Path workingDir; 062 063 public S3FileSystem() { 064 // set store in initialize() 065 } 066 067 public S3FileSystem(FileSystemStore store) { 068 this.store = store; 069 } 070 071 /** 072 * Return the protocol scheme for the FileSystem. 073 * <p/> 074 * 075 * @return <code>s3</code> 076 */ 077 @Override 078 public String getScheme() { 079 return "s3"; 080 } 081 082 @Override 083 public URI getUri() { 084 return uri; 085 } 086 087 @Override 088 public void initialize(URI uri, Configuration conf) throws IOException { 089 super.initialize(uri, conf); 090 if (store == null) { 091 store = createDefaultStore(conf); 092 } 093 store.initialize(uri, conf); 094 setConf(conf); 095 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 096 this.workingDir = 097 new Path("/user", System.getProperty("user.name")).makeQualified(this); 098 } 099 100 private static FileSystemStore createDefaultStore(Configuration conf) { 101 FileSystemStore store = new Jets3tFileSystemStore(); 102 103 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 104 conf.getInt("fs.s3.maxRetries", 4), 105 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 106 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 107 new HashMap<Class<? extends Exception>, RetryPolicy>(); 108 exceptionToPolicyMap.put(IOException.class, basePolicy); 109 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 110 111 RetryPolicy methodPolicy = RetryPolicies.retryByException( 112 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 113 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 114 methodNameToPolicyMap.put("storeBlock", methodPolicy); 115 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 116 117 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 118 store, methodNameToPolicyMap); 119 } 120 121 @Override 122 public Path getWorkingDirectory() { 123 return workingDir; 124 } 125 126 @Override 127 public void setWorkingDirectory(Path dir) { 128 workingDir = makeAbsolute(dir); 129 } 130 131 private Path makeAbsolute(Path path) { 132 if (path.isAbsolute()) { 133 return path; 134 } 135 return new Path(workingDir, path); 136 } 137 138 /** 139 * @param permission Currently ignored. 140 */ 141 @Override 142 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 143 Path absolutePath = makeAbsolute(path); 144 List<Path> paths = new ArrayList<Path>(); 145 do { 146 paths.add(0, absolutePath); 147 absolutePath = absolutePath.getParent(); 148 } while (absolutePath != null); 149 150 boolean result = true; 151 for (Path p : paths) { 152 result &= mkdir(p); 153 } 154 return result; 155 } 156 157 private boolean mkdir(Path path) throws IOException { 158 Path absolutePath = makeAbsolute(path); 159 INode inode = store.retrieveINode(absolutePath); 160 if (inode == null) { 161 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 162 } else if (inode.isFile()) { 163 throw new IOException(String.format( 164 "Can't make directory for path %s since it is a file.", 165 absolutePath)); 166 } 167 return true; 168 } 169 170 @Override 171 public boolean isFile(Path path) throws IOException { 172 INode inode = store.retrieveINode(makeAbsolute(path)); 173 if (inode == null) { 174 return false; 175 } 176 return inode.isFile(); 177 } 178 179 private INode checkFile(Path path) throws IOException { 180 INode inode = store.retrieveINode(makeAbsolute(path)); 181 if (inode == null) { 182 throw new IOException("No such file."); 183 } 184 if (inode.isDirectory()) { 185 throw new IOException("Path " + path + " is a directory."); 186 } 187 return inode; 188 } 189 190 @Override 191 public FileStatus[] listStatus(Path f) throws IOException { 192 Path absolutePath = makeAbsolute(f); 193 INode inode = store.retrieveINode(absolutePath); 194 if (inode == null) { 195 throw new FileNotFoundException("File " + f + " does not exist."); 196 } 197 if (inode.isFile()) { 198 return new FileStatus[] { 199 new S3FileStatus(f.makeQualified(this), inode) 200 }; 201 } 202 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 203 for (Path p : store.listSubPaths(absolutePath)) { 204 ret.add(getFileStatus(p.makeQualified(this))); 205 } 206 return ret.toArray(new FileStatus[0]); 207 } 208 209 /** This optional operation is not yet supported. */ 210 @Override 211 public FSDataOutputStream append(Path f, int bufferSize, 212 Progressable progress) throws IOException { 213 throw new IOException("Not supported"); 214 } 215 216 /** 217 * @param permission Currently ignored. 218 */ 219 @Override 220 public FSDataOutputStream create(Path file, FsPermission permission, 221 boolean overwrite, int bufferSize, 222 short replication, long blockSize, Progressable progress) 223 throws IOException { 224 225 INode inode = store.retrieveINode(makeAbsolute(file)); 226 if (inode != null) { 227 if (overwrite) { 228 delete(file, true); 229 } else { 230 throw new FileAlreadyExistsException("File already exists: " + file); 231 } 232 } else { 233 Path parent = file.getParent(); 234 if (parent != null) { 235 if (!mkdirs(parent)) { 236 throw new IOException("Mkdirs failed to create " + parent.toString()); 237 } 238 } 239 } 240 return new FSDataOutputStream 241 (new S3OutputStream(getConf(), store, makeAbsolute(file), 242 blockSize, progress, bufferSize), 243 statistics); 244 } 245 246 @Override 247 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 248 INode inode = checkFile(path); 249 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 250 statistics)); 251 } 252 253 @Override 254 public boolean rename(Path src, Path dst) throws IOException { 255 Path absoluteSrc = makeAbsolute(src); 256 INode srcINode = store.retrieveINode(absoluteSrc); 257 if (srcINode == null) { 258 // src path doesn't exist 259 return false; 260 } 261 Path absoluteDst = makeAbsolute(dst); 262 INode dstINode = store.retrieveINode(absoluteDst); 263 if (dstINode != null && dstINode.isDirectory()) { 264 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 265 dstINode = store.retrieveINode(absoluteDst); 266 } 267 if (dstINode != null) { 268 // dst path already exists - can't overwrite 269 return false; 270 } 271 Path dstParent = absoluteDst.getParent(); 272 if (dstParent != null) { 273 INode dstParentINode = store.retrieveINode(dstParent); 274 if (dstParentINode == null || dstParentINode.isFile()) { 275 // dst parent doesn't exist or is a file 276 return false; 277 } 278 } 279 return renameRecursive(absoluteSrc, absoluteDst); 280 } 281 282 private boolean renameRecursive(Path src, Path dst) throws IOException { 283 INode srcINode = store.retrieveINode(src); 284 store.storeINode(dst, srcINode); 285 store.deleteINode(src); 286 if (srcINode.isDirectory()) { 287 for (Path oldSrc : store.listDeepSubPaths(src)) { 288 INode inode = store.retrieveINode(oldSrc); 289 if (inode == null) { 290 return false; 291 } 292 String oldSrcPath = oldSrc.toUri().getPath(); 293 String srcPath = src.toUri().getPath(); 294 String dstPath = dst.toUri().getPath(); 295 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 296 store.storeINode(newDst, inode); 297 store.deleteINode(oldSrc); 298 } 299 } 300 return true; 301 } 302 303 @Override 304 public boolean delete(Path path, boolean recursive) throws IOException { 305 Path absolutePath = makeAbsolute(path); 306 INode inode = store.retrieveINode(absolutePath); 307 if (inode == null) { 308 return false; 309 } 310 if (inode.isFile()) { 311 store.deleteINode(absolutePath); 312 for (Block block: inode.getBlocks()) { 313 store.deleteBlock(block); 314 } 315 } else { 316 FileStatus[] contents = null; 317 try { 318 contents = listStatus(absolutePath); 319 } catch(FileNotFoundException fnfe) { 320 return false; 321 } 322 323 if ((contents.length !=0) && (!recursive)) { 324 throw new IOException("Directory " + path.toString() 325 + " is not empty."); 326 } 327 for (FileStatus p:contents) { 328 if (!delete(p.getPath(), recursive)) { 329 return false; 330 } 331 } 332 store.deleteINode(absolutePath); 333 } 334 return true; 335 } 336 337 /** 338 * FileStatus for S3 file systems. 339 */ 340 @Override 341 public FileStatus getFileStatus(Path f) throws IOException { 342 INode inode = store.retrieveINode(makeAbsolute(f)); 343 if (inode == null) { 344 throw new FileNotFoundException(f + ": No such file or directory."); 345 } 346 return new S3FileStatus(f.makeQualified(this), inode); 347 } 348 349 @Override 350 public long getDefaultBlockSize() { 351 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 352 } 353 354 @Override 355 public String getCanonicalServiceName() { 356 // Does not support Token 357 return null; 358 } 359 360 // diagnostic methods 361 362 void dump() throws IOException { 363 store.dump(); 364 } 365 366 void purge() throws IOException { 367 store.purge(); 368 } 369 370 private static class S3FileStatus extends FileStatus { 371 372 S3FileStatus(Path f, INode inode) throws IOException { 373 super(findLength(inode), inode.isDirectory(), 1, 374 findBlocksize(inode), 0, f); 375 } 376 377 private static long findLength(INode inode) { 378 if (!inode.isDirectory()) { 379 long length = 0L; 380 for (Block block : inode.getBlocks()) { 381 length += block.getLength(); 382 } 383 return length; 384 } 385 return 0; 386 } 387 388 private static long findBlocksize(INode inode) { 389 final Block[] ret = inode.getBlocks(); 390 return ret == null ? 0L : ret[0].getLength(); 391 } 392 } 393}