001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs.s3; 020 021 import java.io.FileNotFoundException; 022 import java.io.IOException; 023 import java.net.URI; 024 import java.util.ArrayList; 025 import java.util.HashMap; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.concurrent.TimeUnit; 029 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.hadoop.fs.FSDataInputStream; 034 import org.apache.hadoop.fs.FSDataOutputStream; 035 import org.apache.hadoop.fs.FileStatus; 036 import org.apache.hadoop.fs.FileSystem; 037 import org.apache.hadoop.fs.Path; 038 import org.apache.hadoop.fs.permission.FsPermission; 039 import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 040 import org.apache.hadoop.io.retry.RetryPolicies; 041 import org.apache.hadoop.io.retry.RetryPolicy; 042 import org.apache.hadoop.io.retry.RetryProxy; 043 import org.apache.hadoop.util.Progressable; 044 045 /** 046 * <p> 047 * A block-based {@link FileSystem} backed by 048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 049 * </p> 050 * @see NativeS3FileSystem 051 */ 052 @InterfaceAudience.Public 053 @InterfaceStability.Stable 054 public class S3FileSystem extends FileSystem { 055 056 private URI uri; 057 058 private FileSystemStore store; 059 060 private Path workingDir; 061 062 public S3FileSystem() { 063 // set store in initialize() 064 } 065 066 public S3FileSystem(FileSystemStore store) { 067 this.store = store; 068 } 069 070 /** 071 * Return the protocol scheme for the FileSystem. 072 * <p/> 073 * 074 * @return <code>s3</code> 075 */ 076 @Override 077 public String getScheme() { 078 return "s3"; 079 } 080 081 @Override 082 public URI getUri() { 083 return uri; 084 } 085 086 @Override 087 public void initialize(URI uri, Configuration conf) throws IOException { 088 super.initialize(uri, conf); 089 if (store == null) { 090 store = createDefaultStore(conf); 091 } 092 store.initialize(uri, conf); 093 setConf(conf); 094 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 095 this.workingDir = 096 new Path("/user", System.getProperty("user.name")).makeQualified(this); 097 } 098 099 private static FileSystemStore createDefaultStore(Configuration conf) { 100 FileSystemStore store = new Jets3tFileSystemStore(); 101 102 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 103 conf.getInt("fs.s3.maxRetries", 4), 104 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 105 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 106 new HashMap<Class<? extends Exception>, RetryPolicy>(); 107 exceptionToPolicyMap.put(IOException.class, basePolicy); 108 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 109 110 RetryPolicy methodPolicy = RetryPolicies.retryByException( 111 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 112 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 113 methodNameToPolicyMap.put("storeBlock", methodPolicy); 114 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 115 116 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 117 store, methodNameToPolicyMap); 118 } 119 120 @Override 121 public Path getWorkingDirectory() { 122 return workingDir; 123 } 124 125 @Override 126 public void setWorkingDirectory(Path dir) { 127 workingDir = makeAbsolute(dir); 128 } 129 130 private Path makeAbsolute(Path path) { 131 if (path.isAbsolute()) { 132 return path; 133 } 134 return new Path(workingDir, path); 135 } 136 137 /** 138 * @param permission Currently ignored. 139 */ 140 @Override 141 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 142 Path absolutePath = makeAbsolute(path); 143 List<Path> paths = new ArrayList<Path>(); 144 do { 145 paths.add(0, absolutePath); 146 absolutePath = absolutePath.getParent(); 147 } while (absolutePath != null); 148 149 boolean result = true; 150 for (Path p : paths) { 151 result &= mkdir(p); 152 } 153 return result; 154 } 155 156 private boolean mkdir(Path path) throws IOException { 157 Path absolutePath = makeAbsolute(path); 158 INode inode = store.retrieveINode(absolutePath); 159 if (inode == null) { 160 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 161 } else if (inode.isFile()) { 162 throw new IOException(String.format( 163 "Can't make directory for path %s since it is a file.", 164 absolutePath)); 165 } 166 return true; 167 } 168 169 @Override 170 public boolean isFile(Path path) throws IOException { 171 INode inode = store.retrieveINode(makeAbsolute(path)); 172 if (inode == null) { 173 return false; 174 } 175 return inode.isFile(); 176 } 177 178 private INode checkFile(Path path) throws IOException { 179 INode inode = store.retrieveINode(makeAbsolute(path)); 180 if (inode == null) { 181 throw new IOException("No such file."); 182 } 183 if (inode.isDirectory()) { 184 throw new IOException("Path " + path + " is a directory."); 185 } 186 return inode; 187 } 188 189 @Override 190 public FileStatus[] listStatus(Path f) throws IOException { 191 Path absolutePath = makeAbsolute(f); 192 INode inode = store.retrieveINode(absolutePath); 193 if (inode == null) { 194 throw new FileNotFoundException("File " + f + " does not exist."); 195 } 196 if (inode.isFile()) { 197 return new FileStatus[] { 198 new S3FileStatus(f.makeQualified(this), inode) 199 }; 200 } 201 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 202 for (Path p : store.listSubPaths(absolutePath)) { 203 ret.add(getFileStatus(p.makeQualified(this))); 204 } 205 return ret.toArray(new FileStatus[0]); 206 } 207 208 /** This optional operation is not yet supported. */ 209 @Override 210 public FSDataOutputStream append(Path f, int bufferSize, 211 Progressable progress) throws IOException { 212 throw new IOException("Not supported"); 213 } 214 215 /** 216 * @param permission Currently ignored. 217 */ 218 @Override 219 public FSDataOutputStream create(Path file, FsPermission permission, 220 boolean overwrite, int bufferSize, 221 short replication, long blockSize, Progressable progress) 222 throws IOException { 223 224 INode inode = store.retrieveINode(makeAbsolute(file)); 225 if (inode != null) { 226 if (overwrite) { 227 delete(file, true); 228 } else { 229 throw new IOException("File already exists: " + file); 230 } 231 } else { 232 Path parent = file.getParent(); 233 if (parent != null) { 234 if (!mkdirs(parent)) { 235 throw new IOException("Mkdirs failed to create " + parent.toString()); 236 } 237 } 238 } 239 return new FSDataOutputStream 240 (new S3OutputStream(getConf(), store, makeAbsolute(file), 241 blockSize, progress, bufferSize), 242 statistics); 243 } 244 245 @Override 246 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 247 INode inode = checkFile(path); 248 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 249 statistics)); 250 } 251 252 @Override 253 public boolean rename(Path src, Path dst) throws IOException { 254 Path absoluteSrc = makeAbsolute(src); 255 INode srcINode = store.retrieveINode(absoluteSrc); 256 if (srcINode == null) { 257 // src path doesn't exist 258 return false; 259 } 260 Path absoluteDst = makeAbsolute(dst); 261 INode dstINode = store.retrieveINode(absoluteDst); 262 if (dstINode != null && dstINode.isDirectory()) { 263 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 264 dstINode = store.retrieveINode(absoluteDst); 265 } 266 if (dstINode != null) { 267 // dst path already exists - can't overwrite 268 return false; 269 } 270 Path dstParent = absoluteDst.getParent(); 271 if (dstParent != null) { 272 INode dstParentINode = store.retrieveINode(dstParent); 273 if (dstParentINode == null || dstParentINode.isFile()) { 274 // dst parent doesn't exist or is a file 275 return false; 276 } 277 } 278 return renameRecursive(absoluteSrc, absoluteDst); 279 } 280 281 private boolean renameRecursive(Path src, Path dst) throws IOException { 282 INode srcINode = store.retrieveINode(src); 283 store.storeINode(dst, srcINode); 284 store.deleteINode(src); 285 if (srcINode.isDirectory()) { 286 for (Path oldSrc : store.listDeepSubPaths(src)) { 287 INode inode = store.retrieveINode(oldSrc); 288 if (inode == null) { 289 return false; 290 } 291 String oldSrcPath = oldSrc.toUri().getPath(); 292 String srcPath = src.toUri().getPath(); 293 String dstPath = dst.toUri().getPath(); 294 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 295 store.storeINode(newDst, inode); 296 store.deleteINode(oldSrc); 297 } 298 } 299 return true; 300 } 301 302 @Override 303 public boolean delete(Path path, boolean recursive) throws IOException { 304 Path absolutePath = makeAbsolute(path); 305 INode inode = store.retrieveINode(absolutePath); 306 if (inode == null) { 307 return false; 308 } 309 if (inode.isFile()) { 310 store.deleteINode(absolutePath); 311 for (Block block: inode.getBlocks()) { 312 store.deleteBlock(block); 313 } 314 } else { 315 FileStatus[] contents = null; 316 try { 317 contents = listStatus(absolutePath); 318 } catch(FileNotFoundException fnfe) { 319 return false; 320 } 321 322 if ((contents.length !=0) && (!recursive)) { 323 throw new IOException("Directory " + path.toString() 324 + " is not empty."); 325 } 326 for (FileStatus p:contents) { 327 if (!delete(p.getPath(), recursive)) { 328 return false; 329 } 330 } 331 store.deleteINode(absolutePath); 332 } 333 return true; 334 } 335 336 /** 337 * FileStatus for S3 file systems. 338 */ 339 @Override 340 public FileStatus getFileStatus(Path f) throws IOException { 341 INode inode = store.retrieveINode(makeAbsolute(f)); 342 if (inode == null) { 343 throw new FileNotFoundException(f + ": No such file or directory."); 344 } 345 return new S3FileStatus(f.makeQualified(this), inode); 346 } 347 348 @Override 349 public long getDefaultBlockSize() { 350 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 351 } 352 353 @Override 354 public String getCanonicalServiceName() { 355 // Does not support Token 356 return null; 357 } 358 359 // diagnostic methods 360 361 void dump() throws IOException { 362 store.dump(); 363 } 364 365 void purge() throws IOException { 366 store.purge(); 367 } 368 369 private static class S3FileStatus extends FileStatus { 370 371 S3FileStatus(Path f, INode inode) throws IOException { 372 super(findLength(inode), inode.isDirectory(), 1, 373 findBlocksize(inode), 0, f); 374 } 375 376 private static long findLength(INode inode) { 377 if (!inode.isDirectory()) { 378 long length = 0L; 379 for (Block block : inode.getBlocks()) { 380 length += block.getLength(); 381 } 382 return length; 383 } 384 return 0; 385 } 386 387 private static long findBlocksize(INode inode) { 388 final Block[] ret = inode.getBlocks(); 389 return ret == null ? 0L : ret[0].getLength(); 390 } 391 } 392 }