001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.permission.FsPermission; 039import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 040import org.apache.hadoop.io.retry.RetryPolicies; 041import org.apache.hadoop.io.retry.RetryPolicy; 042import org.apache.hadoop.io.retry.RetryProxy; 043import org.apache.hadoop.util.Progressable; 044 045/** 046 * <p> 047 * A block-based {@link FileSystem} backed by 048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 049 * </p> 050 * @see NativeS3FileSystem 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Stable 054public class S3FileSystem extends FileSystem { 055 056 private URI uri; 057 058 private FileSystemStore store; 059 060 private Path workingDir; 061 062 public S3FileSystem() { 063 // set store in initialize() 064 } 065 066 public S3FileSystem(FileSystemStore store) { 067 this.store = store; 068 } 069 070 @Override 071 public URI getUri() { 072 return uri; 073 } 074 075 @Override 076 public void initialize(URI uri, Configuration conf) throws IOException { 077 super.initialize(uri, conf); 078 if (store == null) { 079 store = createDefaultStore(conf); 080 } 081 store.initialize(uri, conf); 082 setConf(conf); 083 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 084 this.workingDir = 085 new Path("/user", System.getProperty("user.name")).makeQualified(this); 086 } 087 088 private static FileSystemStore createDefaultStore(Configuration conf) { 089 FileSystemStore store = new Jets3tFileSystemStore(); 090 091 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 092 conf.getInt("fs.s3.maxRetries", 4), 093 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 094 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 095 new HashMap<Class<? extends Exception>, RetryPolicy>(); 096 exceptionToPolicyMap.put(IOException.class, basePolicy); 097 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 098 099 RetryPolicy methodPolicy = RetryPolicies.retryByException( 100 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 101 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 102 methodNameToPolicyMap.put("storeBlock", methodPolicy); 103 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 104 105 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 106 store, methodNameToPolicyMap); 107 } 108 109 @Override 110 public Path getWorkingDirectory() { 111 return workingDir; 112 } 113 114 @Override 115 public void setWorkingDirectory(Path dir) { 116 workingDir = makeAbsolute(dir); 117 } 118 119 private Path makeAbsolute(Path path) { 120 if (path.isAbsolute()) { 121 return path; 122 } 123 return new Path(workingDir, path); 124 } 125 126 /** 127 * @param permission Currently ignored. 128 */ 129 @Override 130 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 131 Path absolutePath = makeAbsolute(path); 132 List<Path> paths = new ArrayList<Path>(); 133 do { 134 paths.add(0, absolutePath); 135 absolutePath = absolutePath.getParent(); 136 } while (absolutePath != null); 137 138 boolean result = true; 139 for (Path p : paths) { 140 result &= mkdir(p); 141 } 142 return result; 143 } 144 145 private boolean mkdir(Path path) throws IOException { 146 Path absolutePath = makeAbsolute(path); 147 INode inode = store.retrieveINode(absolutePath); 148 if (inode == null) { 149 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 150 } else if (inode.isFile()) { 151 throw new IOException(String.format( 152 "Can't make directory for path %s since it is a file.", 153 absolutePath)); 154 } 155 return true; 156 } 157 158 @Override 159 public boolean isFile(Path path) throws IOException { 160 INode inode = store.retrieveINode(makeAbsolute(path)); 161 if (inode == null) { 162 return false; 163 } 164 return inode.isFile(); 165 } 166 167 private INode checkFile(Path path) throws IOException { 168 INode inode = store.retrieveINode(makeAbsolute(path)); 169 if (inode == null) { 170 throw new IOException("No such file."); 171 } 172 if (inode.isDirectory()) { 173 throw new IOException("Path " + path + " is a directory."); 174 } 175 return inode; 176 } 177 178 @Override 179 public FileStatus[] listStatus(Path f) throws IOException { 180 Path absolutePath = makeAbsolute(f); 181 INode inode = store.retrieveINode(absolutePath); 182 if (inode == null) { 183 throw new FileNotFoundException("File " + f + " does not exist."); 184 } 185 if (inode.isFile()) { 186 return new FileStatus[] { 187 new S3FileStatus(f.makeQualified(this), inode) 188 }; 189 } 190 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 191 for (Path p : store.listSubPaths(absolutePath)) { 192 ret.add(getFileStatus(p.makeQualified(this))); 193 } 194 return ret.toArray(new FileStatus[0]); 195 } 196 197 /** This optional operation is not yet supported. */ 198 public FSDataOutputStream append(Path f, int bufferSize, 199 Progressable progress) throws IOException { 200 throw new IOException("Not supported"); 201 } 202 203 /** 204 * @param permission Currently ignored. 205 */ 206 @Override 207 public FSDataOutputStream create(Path file, FsPermission permission, 208 boolean overwrite, int bufferSize, 209 short replication, long blockSize, Progressable progress) 210 throws IOException { 211 212 INode inode = store.retrieveINode(makeAbsolute(file)); 213 if (inode != null) { 214 if (overwrite) { 215 delete(file, true); 216 } else { 217 throw new IOException("File already exists: " + file); 218 } 219 } else { 220 Path parent = file.getParent(); 221 if (parent != null) { 222 if (!mkdirs(parent)) { 223 throw new IOException("Mkdirs failed to create " + parent.toString()); 224 } 225 } 226 } 227 return new FSDataOutputStream 228 (new S3OutputStream(getConf(), store, makeAbsolute(file), 229 blockSize, progress, bufferSize), 230 statistics); 231 } 232 233 @Override 234 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 235 INode inode = checkFile(path); 236 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 237 statistics)); 238 } 239 240 @Override 241 public boolean rename(Path src, Path dst) throws IOException { 242 Path absoluteSrc = makeAbsolute(src); 243 INode srcINode = store.retrieveINode(absoluteSrc); 244 if (srcINode == null) { 245 // src path doesn't exist 246 return false; 247 } 248 Path absoluteDst = makeAbsolute(dst); 249 INode dstINode = store.retrieveINode(absoluteDst); 250 if (dstINode != null && dstINode.isDirectory()) { 251 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 252 dstINode = store.retrieveINode(absoluteDst); 253 } 254 if (dstINode != null) { 255 // dst path already exists - can't overwrite 256 return false; 257 } 258 Path dstParent = absoluteDst.getParent(); 259 if (dstParent != null) { 260 INode dstParentINode = store.retrieveINode(dstParent); 261 if (dstParentINode == null || dstParentINode.isFile()) { 262 // dst parent doesn't exist or is a file 263 return false; 264 } 265 } 266 return renameRecursive(absoluteSrc, absoluteDst); 267 } 268 269 private boolean renameRecursive(Path src, Path dst) throws IOException { 270 INode srcINode = store.retrieveINode(src); 271 store.storeINode(dst, srcINode); 272 store.deleteINode(src); 273 if (srcINode.isDirectory()) { 274 for (Path oldSrc : store.listDeepSubPaths(src)) { 275 INode inode = store.retrieveINode(oldSrc); 276 if (inode == null) { 277 return false; 278 } 279 String oldSrcPath = oldSrc.toUri().getPath(); 280 String srcPath = src.toUri().getPath(); 281 String dstPath = dst.toUri().getPath(); 282 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 283 store.storeINode(newDst, inode); 284 store.deleteINode(oldSrc); 285 } 286 } 287 return true; 288 } 289 290 public boolean delete(Path path, boolean recursive) throws IOException { 291 Path absolutePath = makeAbsolute(path); 292 INode inode = store.retrieveINode(absolutePath); 293 if (inode == null) { 294 return false; 295 } 296 if (inode.isFile()) { 297 store.deleteINode(absolutePath); 298 for (Block block: inode.getBlocks()) { 299 store.deleteBlock(block); 300 } 301 } else { 302 FileStatus[] contents = null; 303 try { 304 contents = listStatus(absolutePath); 305 } catch(FileNotFoundException fnfe) { 306 return false; 307 } 308 309 if ((contents.length !=0) && (!recursive)) { 310 throw new IOException("Directory " + path.toString() 311 + " is not empty."); 312 } 313 for (FileStatus p:contents) { 314 if (!delete(p.getPath(), recursive)) { 315 return false; 316 } 317 } 318 store.deleteINode(absolutePath); 319 } 320 return true; 321 } 322 323 /** 324 * FileStatus for S3 file systems. 325 */ 326 @Override 327 public FileStatus getFileStatus(Path f) throws IOException { 328 INode inode = store.retrieveINode(makeAbsolute(f)); 329 if (inode == null) { 330 throw new FileNotFoundException(f + ": No such file or directory."); 331 } 332 return new S3FileStatus(f.makeQualified(this), inode); 333 } 334 335 @Override 336 public long getDefaultBlockSize() { 337 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 338 } 339 340 // diagnostic methods 341 342 void dump() throws IOException { 343 store.dump(); 344 } 345 346 void purge() throws IOException { 347 store.purge(); 348 } 349 350 private static class S3FileStatus extends FileStatus { 351 352 S3FileStatus(Path f, INode inode) throws IOException { 353 super(findLength(inode), inode.isDirectory(), 1, 354 findBlocksize(inode), 0, f); 355 } 356 357 private static long findLength(INode inode) { 358 if (!inode.isDirectory()) { 359 long length = 0L; 360 for (Block block : inode.getBlocks()) { 361 length += block.getLength(); 362 } 363 return length; 364 } 365 return 0; 366 } 367 368 private static long findBlocksize(INode inode) { 369 final Block[] ret = inode.getBlocks(); 370 return ret == null ? 0L : ret[0].getLength(); 371 } 372 } 373}