001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileAlreadyExistsException; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.ParentNotDirectoryException; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.permission.FsPermission; 042import org.apache.hadoop.fs.s3a.S3AFileSystem; 043import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 044import org.apache.hadoop.fs.s3native.S3xLoginHelper; 045import org.apache.hadoop.io.retry.RetryPolicies; 046import org.apache.hadoop.io.retry.RetryPolicy; 047import org.apache.hadoop.io.retry.RetryProxy; 048import org.apache.hadoop.util.Progressable; 049 050/** 051 * A block-based {@link FileSystem} backed by 052 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 053 * 054 * @see NativeS3FileSystem 055 * @deprecated Use {@link NativeS3FileSystem} and {@link S3AFileSystem} instead. 056 */ 057@InterfaceAudience.Public 058@InterfaceStability.Stable 059@Deprecated 060public class S3FileSystem extends FileSystem { 061 062 private static final AtomicBoolean hasWarnedDeprecation 063 = new AtomicBoolean(false); 064 065 private URI uri; 066 067 private FileSystemStore store; 068 069 private Path workingDir; 070 071 public S3FileSystem() { 072 // set store in initialize() 073 } 074 075 public S3FileSystem(FileSystemStore store) { 076 this.store = store; 077 } 078 079 /** 080 * This is to warn the first time in a JVM that an S3FileSystem is created. 081 */ 082 private static void warnDeprecation() { 083 if (!hasWarnedDeprecation.getAndSet(true)) { 084 LOG.warn("S3FileSystem is deprecated and will be removed in " + 085 "future releases. Use NativeS3FileSystem or S3AFileSystem instead."); 086 } 087 } 088 089 /** 090 * Return the protocol scheme for the FileSystem. 091 * 092 * @return <code>s3</code> 093 */ 094 @Override 095 public String getScheme() { 096 return "s3"; 097 } 098 099 @Override 100 public URI getUri() { 101 return uri; 102 } 103 104 @Override 105 public void initialize(URI uri, Configuration conf) throws IOException { 106 super.initialize(uri, conf); 107 warnDeprecation(); 108 if (store == null) { 109 store = createDefaultStore(conf); 110 } 111 store.initialize(uri, conf); 112 setConf(conf); 113 this.uri = S3xLoginHelper.buildFSURI(uri); 114 this.workingDir = 115 new Path("/user", System.getProperty("user.name")).makeQualified(this); 116 } 117 118 private static FileSystemStore createDefaultStore(Configuration conf) { 119 FileSystemStore store = new Jets3tFileSystemStore(); 120 121 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 122 conf.getInt("fs.s3.maxRetries", 4), 123 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 124 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 125 new HashMap<Class<? extends Exception>, RetryPolicy>(); 126 exceptionToPolicyMap.put(IOException.class, basePolicy); 127 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 128 129 RetryPolicy methodPolicy = RetryPolicies.retryByException( 130 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 131 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 132 methodNameToPolicyMap.put("storeBlock", methodPolicy); 133 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 134 135 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 136 store, methodNameToPolicyMap); 137 } 138 139 @Override 140 public Path getWorkingDirectory() { 141 return workingDir; 142 } 143 144 @Override 145 public void setWorkingDirectory(Path dir) { 146 workingDir = makeAbsolute(dir); 147 } 148 149 private Path makeAbsolute(Path path) { 150 if (path.isAbsolute()) { 151 return path; 152 } 153 return new Path(workingDir, path); 154 } 155 156 /** 157 * Check that a Path belongs to this FileSystem. 158 * Unlike the superclass, this version does not look at authority, 159 * only hostnames. 160 * @param path to check 161 * @throws IllegalArgumentException if there is an FS mismatch 162 */ 163 @Override 164 protected void checkPath(Path path) { 165 S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort()); 166 } 167 168 @Override 169 protected URI canonicalizeUri(URI rawUri) { 170 return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort()); 171 } 172 173 /** 174 * @param permission Currently ignored. 175 */ 176 @Override 177 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 178 Path absolutePath = makeAbsolute(path); 179 List<Path> paths = new ArrayList<Path>(); 180 do { 181 paths.add(0, absolutePath); 182 absolutePath = absolutePath.getParent(); 183 } while (absolutePath != null); 184 185 boolean result = true; 186 for (int i = 0; i < paths.size(); i++) { 187 Path p = paths.get(i); 188 try { 189 result &= mkdir(p); 190 } catch(FileAlreadyExistsException e) { 191 if (i + 1 < paths.size()) { 192 throw new ParentNotDirectoryException(e.getMessage()); 193 } 194 throw e; 195 } 196 } 197 return result; 198 } 199 200 private boolean mkdir(Path path) throws IOException { 201 Path absolutePath = makeAbsolute(path); 202 INode inode = store.retrieveINode(absolutePath); 203 if (inode == null) { 204 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 205 } else if (inode.isFile()) { 206 throw new FileAlreadyExistsException(String.format( 207 "Can't make directory for path %s since it is a file.", 208 absolutePath)); 209 } 210 return true; 211 } 212 213 @Override 214 public boolean isFile(Path path) throws IOException { 215 INode inode = store.retrieveINode(makeAbsolute(path)); 216 if (inode == null) { 217 return false; 218 } 219 return inode.isFile(); 220 } 221 222 private INode checkFile(Path path) throws IOException { 223 INode inode = store.retrieveINode(makeAbsolute(path)); 224 String message = String.format("No such file: '%s'", path.toString()); 225 if (inode == null) { 226 throw new FileNotFoundException(message + " does not exist"); 227 } 228 if (inode.isDirectory()) { 229 throw new FileNotFoundException(message + " is a directory"); 230 } 231 return inode; 232 } 233 234 @Override 235 public FileStatus[] listStatus(Path f) throws IOException { 236 Path absolutePath = makeAbsolute(f); 237 INode inode = store.retrieveINode(absolutePath); 238 if (inode == null) { 239 throw new FileNotFoundException("File " + f + " does not exist."); 240 } 241 if (inode.isFile()) { 242 return new FileStatus[] { 243 new S3FileStatus(f.makeQualified(this), inode) 244 }; 245 } 246 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 247 for (Path p : store.listSubPaths(absolutePath)) { 248 ret.add(getFileStatus(p.makeQualified(this))); 249 } 250 return ret.toArray(new FileStatus[0]); 251 } 252 253 /** This optional operation is not yet supported. */ 254 @Override 255 public FSDataOutputStream append(Path f, int bufferSize, 256 Progressable progress) throws IOException { 257 throw new IOException("Not supported"); 258 } 259 260 /** 261 * @param permission Currently ignored. 262 */ 263 @Override 264 public FSDataOutputStream create(Path file, FsPermission permission, 265 boolean overwrite, int bufferSize, 266 short replication, long blockSize, Progressable progress) 267 throws IOException { 268 269 INode inode = store.retrieveINode(makeAbsolute(file)); 270 if (inode != null) { 271 if (overwrite && !inode.isDirectory()) { 272 delete(file, true); 273 } else { 274 String message = String.format("File already exists: '%s'", file); 275 if (inode.isDirectory()) { 276 message = message + " is a directory"; 277 } 278 throw new FileAlreadyExistsException(message); 279 } 280 } else { 281 Path parent = file.getParent(); 282 if (parent != null) { 283 if (!mkdirs(parent)) { 284 throw new IOException("Mkdirs failed to create " + parent.toString()); 285 } 286 } 287 } 288 return new FSDataOutputStream 289 (new S3OutputStream(getConf(), store, makeAbsolute(file), 290 blockSize, progress, bufferSize), 291 statistics); 292 } 293 294 @Override 295 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 296 INode inode = checkFile(path); 297 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 298 statistics)); 299 } 300 301 @Override 302 public boolean rename(Path src, Path dst) throws IOException { 303 Path absoluteSrc = makeAbsolute(src); 304 INode srcINode = store.retrieveINode(absoluteSrc); 305 if (srcINode == null) { 306 // src path doesn't exist 307 return false; 308 } 309 Path absoluteDst = makeAbsolute(dst); 310 INode dstINode = store.retrieveINode(absoluteDst); 311 if (dstINode != null && dstINode.isDirectory()) { 312 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 313 dstINode = store.retrieveINode(absoluteDst); 314 } 315 if (dstINode != null) { 316 // dst path already exists - can't overwrite 317 return false; 318 } 319 Path dstParent = absoluteDst.getParent(); 320 if (dstParent != null) { 321 INode dstParentINode = store.retrieveINode(dstParent); 322 if (dstParentINode == null || dstParentINode.isFile()) { 323 // dst parent doesn't exist or is a file 324 return false; 325 } 326 } 327 return renameRecursive(absoluteSrc, absoluteDst); 328 } 329 330 private boolean renameRecursive(Path src, Path dst) throws IOException { 331 INode srcINode = store.retrieveINode(src); 332 store.storeINode(dst, srcINode); 333 store.deleteINode(src); 334 if (srcINode.isDirectory()) { 335 for (Path oldSrc : store.listDeepSubPaths(src)) { 336 INode inode = store.retrieveINode(oldSrc); 337 if (inode == null) { 338 return false; 339 } 340 String oldSrcPath = oldSrc.toUri().getPath(); 341 String srcPath = src.toUri().getPath(); 342 String dstPath = dst.toUri().getPath(); 343 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 344 store.storeINode(newDst, inode); 345 store.deleteINode(oldSrc); 346 } 347 } 348 return true; 349 } 350 351 @Override 352 public boolean delete(Path path, boolean recursive) throws IOException { 353 Path absolutePath = makeAbsolute(path); 354 INode inode = store.retrieveINode(absolutePath); 355 if (inode == null) { 356 return false; 357 } 358 if (inode.isFile()) { 359 store.deleteINode(absolutePath); 360 for (Block block: inode.getBlocks()) { 361 store.deleteBlock(block); 362 } 363 } else { 364 FileStatus[] contents = null; 365 try { 366 contents = listStatus(absolutePath); 367 } catch(FileNotFoundException fnfe) { 368 return false; 369 } 370 371 if ((contents.length !=0) && (!recursive)) { 372 throw new IOException("Directory " + path.toString() 373 + " is not empty."); 374 } 375 for (FileStatus p:contents) { 376 if (!delete(p.getPath(), recursive)) { 377 return false; 378 } 379 } 380 store.deleteINode(absolutePath); 381 } 382 return true; 383 } 384 385 /** 386 * FileStatus for S3 file systems. 387 */ 388 @Override 389 public FileStatus getFileStatus(Path f) throws IOException { 390 INode inode = store.retrieveINode(makeAbsolute(f)); 391 if (inode == null) { 392 throw new FileNotFoundException(f + ": No such file or directory."); 393 } 394 return new S3FileStatus(f.makeQualified(this), inode); 395 } 396 397 @Override 398 public long getDefaultBlockSize() { 399 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 400 } 401 402 @Override 403 public String getCanonicalServiceName() { 404 // Does not support Token 405 return null; 406 } 407 408 // diagnostic methods 409 410 void dump() throws IOException { 411 store.dump(); 412 } 413 414 void purge() throws IOException { 415 store.purge(); 416 } 417 418 private static class S3FileStatus extends FileStatus { 419 420 S3FileStatus(Path f, INode inode) throws IOException { 421 super(findLength(inode), inode.isDirectory(), 1, 422 findBlocksize(inode), 0, f); 423 } 424 425 private static long findLength(INode inode) { 426 if (!inode.isDirectory()) { 427 long length = 0L; 428 for (Block block : inode.getBlocks()) { 429 length += block.getLength(); 430 } 431 return length; 432 } 433 return 0; 434 } 435 436 private static long findBlocksize(INode inode) { 437 final Block[] ret = inode.getBlocks(); 438 return ret == null ? 0L : ret[0].getLength(); 439 } 440 } 441}