001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs.s3;
020
021 import java.io.FileNotFoundException;
022 import java.io.IOException;
023 import java.net.URI;
024 import java.util.ArrayList;
025 import java.util.HashMap;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.concurrent.TimeUnit;
029
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FSDataInputStream;
034 import org.apache.hadoop.fs.FSDataOutputStream;
035 import org.apache.hadoop.fs.FileAlreadyExistsException;
036 import org.apache.hadoop.fs.FileStatus;
037 import org.apache.hadoop.fs.FileSystem;
038 import org.apache.hadoop.fs.Path;
039 import org.apache.hadoop.fs.permission.FsPermission;
040 import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
041 import org.apache.hadoop.io.retry.RetryPolicies;
042 import org.apache.hadoop.io.retry.RetryPolicy;
043 import org.apache.hadoop.io.retry.RetryProxy;
044 import org.apache.hadoop.util.Progressable;
045
046 /**
047 * <p>
048 * A block-based {@link FileSystem} backed by
049 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
050 * </p>
051 * @see NativeS3FileSystem
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Stable
055 public class S3FileSystem extends FileSystem {
056
057 private URI uri;
058
059 private FileSystemStore store;
060
061 private Path workingDir;
062
063 public S3FileSystem() {
064 // set store in initialize()
065 }
066
067 public S3FileSystem(FileSystemStore store) {
068 this.store = store;
069 }
070
071 /**
072 * Return the protocol scheme for the FileSystem.
073 * <p/>
074 *
075 * @return <code>s3</code>
076 */
077 @Override
078 public String getScheme() {
079 return "s3";
080 }
081
082 @Override
083 public URI getUri() {
084 return uri;
085 }
086
087 @Override
088 public void initialize(URI uri, Configuration conf) throws IOException {
089 super.initialize(uri, conf);
090 if (store == null) {
091 store = createDefaultStore(conf);
092 }
093 store.initialize(uri, conf);
094 setConf(conf);
095 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
096 this.workingDir =
097 new Path("/user", System.getProperty("user.name")).makeQualified(this);
098 }
099
100 private static FileSystemStore createDefaultStore(Configuration conf) {
101 FileSystemStore store = new Jets3tFileSystemStore();
102
103 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
104 conf.getInt("fs.s3.maxRetries", 4),
105 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
106 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
107 new HashMap<Class<? extends Exception>, RetryPolicy>();
108 exceptionToPolicyMap.put(IOException.class, basePolicy);
109 exceptionToPolicyMap.put(S3Exception.class, basePolicy);
110
111 RetryPolicy methodPolicy = RetryPolicies.retryByException(
112 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
113 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
114 methodNameToPolicyMap.put("storeBlock", methodPolicy);
115 methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
116
117 return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
118 store, methodNameToPolicyMap);
119 }
120
121 @Override
122 public Path getWorkingDirectory() {
123 return workingDir;
124 }
125
126 @Override
127 public void setWorkingDirectory(Path dir) {
128 workingDir = makeAbsolute(dir);
129 }
130
131 private Path makeAbsolute(Path path) {
132 if (path.isAbsolute()) {
133 return path;
134 }
135 return new Path(workingDir, path);
136 }
137
138 /**
139 * @param permission Currently ignored.
140 */
141 @Override
142 public boolean mkdirs(Path path, FsPermission permission) throws IOException {
143 Path absolutePath = makeAbsolute(path);
144 List<Path> paths = new ArrayList<Path>();
145 do {
146 paths.add(0, absolutePath);
147 absolutePath = absolutePath.getParent();
148 } while (absolutePath != null);
149
150 boolean result = true;
151 for (Path p : paths) {
152 result &= mkdir(p);
153 }
154 return result;
155 }
156
157 private boolean mkdir(Path path) throws IOException {
158 Path absolutePath = makeAbsolute(path);
159 INode inode = store.retrieveINode(absolutePath);
160 if (inode == null) {
161 store.storeINode(absolutePath, INode.DIRECTORY_INODE);
162 } else if (inode.isFile()) {
163 throw new IOException(String.format(
164 "Can't make directory for path %s since it is a file.",
165 absolutePath));
166 }
167 return true;
168 }
169
170 @Override
171 public boolean isFile(Path path) throws IOException {
172 INode inode = store.retrieveINode(makeAbsolute(path));
173 if (inode == null) {
174 return false;
175 }
176 return inode.isFile();
177 }
178
179 private INode checkFile(Path path) throws IOException {
180 INode inode = store.retrieveINode(makeAbsolute(path));
181 if (inode == null) {
182 throw new IOException("No such file.");
183 }
184 if (inode.isDirectory()) {
185 throw new IOException("Path " + path + " is a directory.");
186 }
187 return inode;
188 }
189
190 @Override
191 public FileStatus[] listStatus(Path f) throws IOException {
192 Path absolutePath = makeAbsolute(f);
193 INode inode = store.retrieveINode(absolutePath);
194 if (inode == null) {
195 throw new FileNotFoundException("File " + f + " does not exist.");
196 }
197 if (inode.isFile()) {
198 return new FileStatus[] {
199 new S3FileStatus(f.makeQualified(this), inode)
200 };
201 }
202 ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
203 for (Path p : store.listSubPaths(absolutePath)) {
204 ret.add(getFileStatus(p.makeQualified(this)));
205 }
206 return ret.toArray(new FileStatus[0]);
207 }
208
209 /** This optional operation is not yet supported. */
210 @Override
211 public FSDataOutputStream append(Path f, int bufferSize,
212 Progressable progress) throws IOException {
213 throw new IOException("Not supported");
214 }
215
216 /**
217 * @param permission Currently ignored.
218 */
219 @Override
220 public FSDataOutputStream create(Path file, FsPermission permission,
221 boolean overwrite, int bufferSize,
222 short replication, long blockSize, Progressable progress)
223 throws IOException {
224
225 INode inode = store.retrieveINode(makeAbsolute(file));
226 if (inode != null) {
227 if (overwrite) {
228 delete(file, true);
229 } else {
230 throw new FileAlreadyExistsException("File already exists: " + file);
231 }
232 } else {
233 Path parent = file.getParent();
234 if (parent != null) {
235 if (!mkdirs(parent)) {
236 throw new IOException("Mkdirs failed to create " + parent.toString());
237 }
238 }
239 }
240 return new FSDataOutputStream
241 (new S3OutputStream(getConf(), store, makeAbsolute(file),
242 blockSize, progress, bufferSize),
243 statistics);
244 }
245
246 @Override
247 public FSDataInputStream open(Path path, int bufferSize) throws IOException {
248 INode inode = checkFile(path);
249 return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
250 statistics));
251 }
252
253 @Override
254 public boolean rename(Path src, Path dst) throws IOException {
255 Path absoluteSrc = makeAbsolute(src);
256 INode srcINode = store.retrieveINode(absoluteSrc);
257 if (srcINode == null) {
258 // src path doesn't exist
259 return false;
260 }
261 Path absoluteDst = makeAbsolute(dst);
262 INode dstINode = store.retrieveINode(absoluteDst);
263 if (dstINode != null && dstINode.isDirectory()) {
264 absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
265 dstINode = store.retrieveINode(absoluteDst);
266 }
267 if (dstINode != null) {
268 // dst path already exists - can't overwrite
269 return false;
270 }
271 Path dstParent = absoluteDst.getParent();
272 if (dstParent != null) {
273 INode dstParentINode = store.retrieveINode(dstParent);
274 if (dstParentINode == null || dstParentINode.isFile()) {
275 // dst parent doesn't exist or is a file
276 return false;
277 }
278 }
279 return renameRecursive(absoluteSrc, absoluteDst);
280 }
281
282 private boolean renameRecursive(Path src, Path dst) throws IOException {
283 INode srcINode = store.retrieveINode(src);
284 store.storeINode(dst, srcINode);
285 store.deleteINode(src);
286 if (srcINode.isDirectory()) {
287 for (Path oldSrc : store.listDeepSubPaths(src)) {
288 INode inode = store.retrieveINode(oldSrc);
289 if (inode == null) {
290 return false;
291 }
292 String oldSrcPath = oldSrc.toUri().getPath();
293 String srcPath = src.toUri().getPath();
294 String dstPath = dst.toUri().getPath();
295 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
296 store.storeINode(newDst, inode);
297 store.deleteINode(oldSrc);
298 }
299 }
300 return true;
301 }
302
303 @Override
304 public boolean delete(Path path, boolean recursive) throws IOException {
305 Path absolutePath = makeAbsolute(path);
306 INode inode = store.retrieveINode(absolutePath);
307 if (inode == null) {
308 return false;
309 }
310 if (inode.isFile()) {
311 store.deleteINode(absolutePath);
312 for (Block block: inode.getBlocks()) {
313 store.deleteBlock(block);
314 }
315 } else {
316 FileStatus[] contents = null;
317 try {
318 contents = listStatus(absolutePath);
319 } catch(FileNotFoundException fnfe) {
320 return false;
321 }
322
323 if ((contents.length !=0) && (!recursive)) {
324 throw new IOException("Directory " + path.toString()
325 + " is not empty.");
326 }
327 for (FileStatus p:contents) {
328 if (!delete(p.getPath(), recursive)) {
329 return false;
330 }
331 }
332 store.deleteINode(absolutePath);
333 }
334 return true;
335 }
336
337 /**
338 * FileStatus for S3 file systems.
339 */
340 @Override
341 public FileStatus getFileStatus(Path f) throws IOException {
342 INode inode = store.retrieveINode(makeAbsolute(f));
343 if (inode == null) {
344 throw new FileNotFoundException(f + ": No such file or directory.");
345 }
346 return new S3FileStatus(f.makeQualified(this), inode);
347 }
348
349 @Override
350 public long getDefaultBlockSize() {
351 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
352 }
353
354 @Override
355 public String getCanonicalServiceName() {
356 // Does not support Token
357 return null;
358 }
359
360 // diagnostic methods
361
362 void dump() throws IOException {
363 store.dump();
364 }
365
366 void purge() throws IOException {
367 store.purge();
368 }
369
370 private static class S3FileStatus extends FileStatus {
371
372 S3FileStatus(Path f, INode inode) throws IOException {
373 super(findLength(inode), inode.isDirectory(), 1,
374 findBlocksize(inode), 0, f);
375 }
376
377 private static long findLength(INode inode) {
378 if (!inode.isDirectory()) {
379 long length = 0L;
380 for (Block block : inode.getBlocks()) {
381 length += block.getLength();
382 }
383 return length;
384 }
385 return 0;
386 }
387
388 private static long findBlocksize(INode inode) {
389 final Block[] ret = inode.getBlocks();
390 return ret == null ? 0L : ret[0].getLength();
391 }
392 }
393 }