001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs.ftp;
019
020 import java.io.FileNotFoundException;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.net.ConnectException;
024 import java.net.URI;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.commons.net.ftp.FTP;
029 import org.apache.commons.net.ftp.FTPClient;
030 import org.apache.commons.net.ftp.FTPFile;
031 import org.apache.commons.net.ftp.FTPReply;
032 import org.apache.hadoop.classification.InterfaceAudience;
033 import org.apache.hadoop.classification.InterfaceStability;
034 import org.apache.hadoop.conf.Configuration;
035 import org.apache.hadoop.fs.FSDataInputStream;
036 import org.apache.hadoop.fs.FSDataOutputStream;
037 import org.apache.hadoop.fs.FileAlreadyExistsException;
038 import org.apache.hadoop.fs.FileStatus;
039 import org.apache.hadoop.fs.FileSystem;
040 import org.apache.hadoop.fs.ParentNotDirectoryException;
041 import org.apache.hadoop.fs.Path;
042 import org.apache.hadoop.fs.permission.FsAction;
043 import org.apache.hadoop.fs.permission.FsPermission;
044 import org.apache.hadoop.net.NetUtils;
045 import org.apache.hadoop.util.Progressable;
046
047 /**
048 * <p>
049 * A {@link FileSystem} backed by an FTP client provided by <a
050 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
051 * </p>
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Stable
055 public class FTPFileSystem extends FileSystem {
056
057 public static final Log LOG = LogFactory
058 .getLog(FTPFileSystem.class);
059
060 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
061
062 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
063 public static final String FS_FTP_USER_PREFIX = "fs.ftp.user.";
064 public static final String FS_FTP_HOST = "fs.ftp.host";
065 public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
066 public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
067 public static final String E_SAME_DIRECTORY_ONLY =
068 "only same directory renames are supported";
069
070 private URI uri;
071
072 /**
073 * Return the protocol scheme for the FileSystem.
074 * <p/>
075 *
076 * @return <code>ftp</code>
077 */
078 @Override
079 public String getScheme() {
080 return "ftp";
081 }
082
083 @Override
084 public void initialize(URI uri, Configuration conf) throws IOException { // get
085 super.initialize(uri, conf);
086 // get host information from uri (overrides info in conf)
087 String host = uri.getHost();
088 host = (host == null) ? conf.get(FS_FTP_HOST, null) : host;
089 if (host == null) {
090 throw new IOException("Invalid host specified");
091 }
092 conf.set(FS_FTP_HOST, host);
093
094 // get port information from uri, (overrides info in conf)
095 int port = uri.getPort();
096 port = (port == -1) ? FTP.DEFAULT_PORT : port;
097 conf.setInt("fs.ftp.host.port", port);
098
099 // get user/password information from URI (overrides info in conf)
100 String userAndPassword = uri.getUserInfo();
101 if (userAndPassword == null) {
102 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
103 .get("fs.ftp.password." + host, null));
104 if (userAndPassword == null) {
105 throw new IOException("Invalid user/passsword specified");
106 }
107 }
108 String[] userPasswdInfo = userAndPassword.split(":");
109 conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]);
110 if (userPasswdInfo.length > 1) {
111 conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]);
112 } else {
113 conf.set(FS_FTP_PASSWORD_PREFIX + host, null);
114 }
115 setConf(conf);
116 this.uri = uri;
117 }
118
119 /**
120 * Connect to the FTP server using configuration parameters *
121 *
122 * @return An FTPClient instance
123 * @throws IOException
124 */
125 private FTPClient connect() throws IOException {
126 FTPClient client = null;
127 Configuration conf = getConf();
128 String host = conf.get(FS_FTP_HOST);
129 int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
130 String user = conf.get(FS_FTP_USER_PREFIX + host);
131 String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
132 client = new FTPClient();
133 client.connect(host, port);
134 int reply = client.getReplyCode();
135 if (!FTPReply.isPositiveCompletion(reply)) {
136 throw NetUtils.wrapException(host, port,
137 NetUtils.UNKNOWN_HOST, 0,
138 new ConnectException("Server response " + reply));
139 } else if (client.login(user, password)) {
140 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
141 client.setFileType(FTP.BINARY_FILE_TYPE);
142 client.setBufferSize(DEFAULT_BUFFER_SIZE);
143 } else {
144 throw new IOException("Login failed on server - " + host + ", port - "
145 + port + " as user '" + user + "'");
146 }
147
148 return client;
149 }
150
151 /**
152 * Logout and disconnect the given FTPClient. *
153 *
154 * @param client
155 * @throws IOException
156 */
157 private void disconnect(FTPClient client) throws IOException {
158 if (client != null) {
159 if (!client.isConnected()) {
160 throw new FTPException("Client not connected");
161 }
162 boolean logoutSuccess = client.logout();
163 client.disconnect();
164 if (!logoutSuccess) {
165 LOG.warn("Logout failed while disconnecting, error code - "
166 + client.getReplyCode());
167 }
168 }
169 }
170
171 /**
172 * Resolve against given working directory. *
173 *
174 * @param workDir
175 * @param path
176 * @return
177 */
178 private Path makeAbsolute(Path workDir, Path path) {
179 if (path.isAbsolute()) {
180 return path;
181 }
182 return new Path(workDir, path);
183 }
184
185 @Override
186 public FSDataInputStream open(Path file, int bufferSize) throws IOException {
187 FTPClient client = connect();
188 Path workDir = new Path(client.printWorkingDirectory());
189 Path absolute = makeAbsolute(workDir, file);
190 FileStatus fileStat = getFileStatus(client, absolute);
191 if (fileStat.isDirectory()) {
192 disconnect(client);
193 throw new FileNotFoundException("Path " + file + " is a directory.");
194 }
195 client.allocate(bufferSize);
196 Path parent = absolute.getParent();
197 // Change to parent directory on the
198 // server. Only then can we read the
199 // file
200 // on the server by opening up an InputStream. As a side effect the working
201 // directory on the server is changed to the parent directory of the file.
202 // The FTP client connection is closed when close() is called on the
203 // FSDataInputStream.
204 client.changeWorkingDirectory(parent.toUri().getPath());
205 InputStream is = client.retrieveFileStream(file.getName());
206 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
207 client, statistics));
208 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
209 // The ftpClient is an inconsistent state. Must close the stream
210 // which in turn will logout and disconnect from FTP server
211 fis.close();
212 throw new IOException("Unable to open file: " + file + ", Aborting");
213 }
214 return fis;
215 }
216
217 /**
218 * A stream obtained via this call must be closed before using other APIs of
219 * this class or else the invocation will block.
220 */
221 @Override
222 public FSDataOutputStream create(Path file, FsPermission permission,
223 boolean overwrite, int bufferSize, short replication, long blockSize,
224 Progressable progress) throws IOException {
225 final FTPClient client = connect();
226 Path workDir = new Path(client.printWorkingDirectory());
227 Path absolute = makeAbsolute(workDir, file);
228 FileStatus status;
229 try {
230 status = getFileStatus(client, file);
231 } catch (FileNotFoundException fnfe) {
232 status = null;
233 }
234 if (status != null) {
235 if (overwrite && !status.isDirectory()) {
236 delete(client, file, false);
237 } else {
238 disconnect(client);
239 throw new FileAlreadyExistsException("File already exists: " + file);
240 }
241 }
242
243 Path parent = absolute.getParent();
244 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
245 parent = (parent == null) ? new Path("/") : parent;
246 disconnect(client);
247 throw new IOException("create(): Mkdirs failed to create: " + parent);
248 }
249 client.allocate(bufferSize);
250 // Change to parent directory on the server. Only then can we write to the
251 // file on the server by opening up an OutputStream. As a side effect the
252 // working directory on the server is changed to the parent directory of the
253 // file. The FTP client connection is closed when close() is called on the
254 // FSDataOutputStream.
255 client.changeWorkingDirectory(parent.toUri().getPath());
256 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
257 .getName()), statistics) {
258 @Override
259 public void close() throws IOException {
260 super.close();
261 if (!client.isConnected()) {
262 throw new FTPException("Client not connected");
263 }
264 boolean cmdCompleted = client.completePendingCommand();
265 disconnect(client);
266 if (!cmdCompleted) {
267 throw new FTPException("Could not complete transfer, Reply Code - "
268 + client.getReplyCode());
269 }
270 }
271 };
272 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
273 // The ftpClient is an inconsistent state. Must close the stream
274 // which in turn will logout and disconnect from FTP server
275 fos.close();
276 throw new IOException("Unable to create file: " + file + ", Aborting");
277 }
278 return fos;
279 }
280
281 /** This optional operation is not yet supported. */
282 @Override
283 public FSDataOutputStream append(Path f, int bufferSize,
284 Progressable progress) throws IOException {
285 throw new IOException("Not supported");
286 }
287
288 /**
289 * Convenience method, so that we don't open a new connection when using this
290 * method from within another method. Otherwise every API invocation incurs
291 * the overhead of opening/closing a TCP connection.
292 * @throws IOException on IO problems other than FileNotFoundException
293 */
294 private boolean exists(FTPClient client, Path file) throws IOException {
295 try {
296 return getFileStatus(client, file) != null;
297 } catch (FileNotFoundException fnfe) {
298 return false;
299 }
300 }
301
302 @Override
303 public boolean delete(Path file, boolean recursive) throws IOException {
304 FTPClient client = connect();
305 try {
306 boolean success = delete(client, file, recursive);
307 return success;
308 } finally {
309 disconnect(client);
310 }
311 }
312
313 /**
314 * Convenience method, so that we don't open a new connection when using this
315 * method from within another method. Otherwise every API invocation incurs
316 * the overhead of opening/closing a TCP connection.
317 */
318 private boolean delete(FTPClient client, Path file, boolean recursive)
319 throws IOException {
320 Path workDir = new Path(client.printWorkingDirectory());
321 Path absolute = makeAbsolute(workDir, file);
322 String pathName = absolute.toUri().getPath();
323 try {
324 FileStatus fileStat = getFileStatus(client, absolute);
325 if (fileStat.isFile()) {
326 return client.deleteFile(pathName);
327 }
328 } catch (FileNotFoundException e) {
329 //the file is not there
330 return false;
331 }
332 FileStatus[] dirEntries = listStatus(client, absolute);
333 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
334 throw new IOException("Directory: " + file + " is not empty.");
335 }
336 if (dirEntries != null) {
337 for (int i = 0; i < dirEntries.length; i++) {
338 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
339 }
340 }
341 return client.removeDirectory(pathName);
342 }
343
344 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
345 FsAction action = FsAction.NONE;
346 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
347 action.or(FsAction.READ);
348 }
349 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
350 action.or(FsAction.WRITE);
351 }
352 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
353 action.or(FsAction.EXECUTE);
354 }
355 return action;
356 }
357
358 private FsPermission getPermissions(FTPFile ftpFile) {
359 FsAction user, group, others;
360 user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
361 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
362 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
363 return new FsPermission(user, group, others);
364 }
365
366 @Override
367 public URI getUri() {
368 return uri;
369 }
370
371 @Override
372 public FileStatus[] listStatus(Path file) throws IOException {
373 FTPClient client = connect();
374 try {
375 FileStatus[] stats = listStatus(client, file);
376 return stats;
377 } finally {
378 disconnect(client);
379 }
380 }
381
382 /**
383 * Convenience method, so that we don't open a new connection when using this
384 * method from within another method. Otherwise every API invocation incurs
385 * the overhead of opening/closing a TCP connection.
386 */
387 private FileStatus[] listStatus(FTPClient client, Path file)
388 throws IOException {
389 Path workDir = new Path(client.printWorkingDirectory());
390 Path absolute = makeAbsolute(workDir, file);
391 FileStatus fileStat = getFileStatus(client, absolute);
392 if (fileStat.isFile()) {
393 return new FileStatus[] { fileStat };
394 }
395 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
396 FileStatus[] fileStats = new FileStatus[ftpFiles.length];
397 for (int i = 0; i < ftpFiles.length; i++) {
398 fileStats[i] = getFileStatus(ftpFiles[i], absolute);
399 }
400 return fileStats;
401 }
402
403 @Override
404 public FileStatus getFileStatus(Path file) throws IOException {
405 FTPClient client = connect();
406 try {
407 FileStatus status = getFileStatus(client, file);
408 return status;
409 } finally {
410 disconnect(client);
411 }
412 }
413
414 /**
415 * Convenience method, so that we don't open a new connection when using this
416 * method from within another method. Otherwise every API invocation incurs
417 * the overhead of opening/closing a TCP connection.
418 */
419 private FileStatus getFileStatus(FTPClient client, Path file)
420 throws IOException {
421 FileStatus fileStat = null;
422 Path workDir = new Path(client.printWorkingDirectory());
423 Path absolute = makeAbsolute(workDir, file);
424 Path parentPath = absolute.getParent();
425 if (parentPath == null) { // root dir
426 long length = -1; // Length of root dir on server not known
427 boolean isDir = true;
428 int blockReplication = 1;
429 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
430 long modTime = -1; // Modification time of root dir not known.
431 Path root = new Path("/");
432 return new FileStatus(length, isDir, blockReplication, blockSize,
433 modTime, root.makeQualified(this));
434 }
435 String pathName = parentPath.toUri().getPath();
436 FTPFile[] ftpFiles = client.listFiles(pathName);
437 if (ftpFiles != null) {
438 for (FTPFile ftpFile : ftpFiles) {
439 if (ftpFile.getName().equals(file.getName())) { // file found in dir
440 fileStat = getFileStatus(ftpFile, parentPath);
441 break;
442 }
443 }
444 if (fileStat == null) {
445 throw new FileNotFoundException("File " + file + " does not exist.");
446 }
447 } else {
448 throw new FileNotFoundException("File " + file + " does not exist.");
449 }
450 return fileStat;
451 }
452
453 /**
454 * Convert the file information in FTPFile to a {@link FileStatus} object. *
455 *
456 * @param ftpFile
457 * @param parentPath
458 * @return FileStatus
459 */
460 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
461 long length = ftpFile.getSize();
462 boolean isDir = ftpFile.isDirectory();
463 int blockReplication = 1;
464 // Using default block size since there is no way in FTP client to know of
465 // block sizes on server. The assumption could be less than ideal.
466 long blockSize = DEFAULT_BLOCK_SIZE;
467 long modTime = ftpFile.getTimestamp().getTimeInMillis();
468 long accessTime = 0;
469 FsPermission permission = getPermissions(ftpFile);
470 String user = ftpFile.getUser();
471 String group = ftpFile.getGroup();
472 Path filePath = new Path(parentPath, ftpFile.getName());
473 return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
474 accessTime, permission, user, group, filePath.makeQualified(this));
475 }
476
477 @Override
478 public boolean mkdirs(Path file, FsPermission permission) throws IOException {
479 FTPClient client = connect();
480 try {
481 boolean success = mkdirs(client, file, permission);
482 return success;
483 } finally {
484 disconnect(client);
485 }
486 }
487
488 /**
489 * Convenience method, so that we don't open a new connection when using this
490 * method from within another method. Otherwise every API invocation incurs
491 * the overhead of opening/closing a TCP connection.
492 */
493 private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
494 throws IOException {
495 boolean created = true;
496 Path workDir = new Path(client.printWorkingDirectory());
497 Path absolute = makeAbsolute(workDir, file);
498 String pathName = absolute.getName();
499 if (!exists(client, absolute)) {
500 Path parent = absolute.getParent();
501 created = (parent == null || mkdirs(client, parent, FsPermission
502 .getDirDefault()));
503 if (created) {
504 String parentDir = parent.toUri().getPath();
505 client.changeWorkingDirectory(parentDir);
506 created = created && client.makeDirectory(pathName);
507 }
508 } else if (isFile(client, absolute)) {
509 throw new ParentNotDirectoryException(String.format(
510 "Can't make directory for path %s since it is a file.", absolute));
511 }
512 return created;
513 }
514
515 /**
516 * Convenience method, so that we don't open a new connection when using this
517 * method from within another method. Otherwise every API invocation incurs
518 * the overhead of opening/closing a TCP connection.
519 */
520 private boolean isFile(FTPClient client, Path file) {
521 try {
522 return getFileStatus(client, file).isFile();
523 } catch (FileNotFoundException e) {
524 return false; // file does not exist
525 } catch (IOException ioe) {
526 throw new FTPException("File check failed", ioe);
527 }
528 }
529
530 /*
531 * Assuming that parent of both source and destination is the same. Is the
532 * assumption correct or it is suppose to work like 'move' ?
533 */
534 @Override
535 public boolean rename(Path src, Path dst) throws IOException {
536 FTPClient client = connect();
537 try {
538 boolean success = rename(client, src, dst);
539 return success;
540 } finally {
541 disconnect(client);
542 }
543 }
544
545 /**
546 * Probe for a path being a parent of another
547 * @param parent parent path
548 * @param child possible child path
549 * @return true if the parent's path matches the start of the child's
550 */
551 private boolean isParentOf(Path parent, Path child) {
552 URI parentURI = parent.toUri();
553 String parentPath = parentURI.getPath();
554 if (!parentPath.endsWith("/")) {
555 parentPath += "/";
556 }
557 URI childURI = child.toUri();
558 String childPath = childURI.getPath();
559 return childPath.startsWith(parentPath);
560 }
561
562 /**
563 * Convenience method, so that we don't open a new connection when using this
564 * method from within another method. Otherwise every API invocation incurs
565 * the overhead of opening/closing a TCP connection.
566 *
567 * @param client
568 * @param src
569 * @param dst
570 * @return
571 * @throws IOException
572 */
573 private boolean rename(FTPClient client, Path src, Path dst)
574 throws IOException {
575 Path workDir = new Path(client.printWorkingDirectory());
576 Path absoluteSrc = makeAbsolute(workDir, src);
577 Path absoluteDst = makeAbsolute(workDir, dst);
578 if (!exists(client, absoluteSrc)) {
579 throw new FileNotFoundException("Source path " + src + " does not exist");
580 }
581 if (isDirectory(absoluteDst)) {
582 // destination is a directory: rename goes underneath it with the
583 // source name
584 absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
585 }
586 if (exists(client, absoluteDst)) {
587 throw new FileAlreadyExistsException("Destination path " + dst
588 + " already exists");
589 }
590 String parentSrc = absoluteSrc.getParent().toUri().toString();
591 String parentDst = absoluteDst.getParent().toUri().toString();
592 if (isParentOf(absoluteSrc, absoluteDst)) {
593 throw new IOException("Cannot rename " + absoluteSrc + " under itself"
594 + " : "+ absoluteDst);
595 }
596
597 if (!parentSrc.equals(parentDst)) {
598 throw new IOException("Cannot rename source: " + absoluteSrc
599 + " to " + absoluteDst
600 + " -"+ E_SAME_DIRECTORY_ONLY);
601 }
602 String from = absoluteSrc.getName();
603 String to = absoluteDst.getName();
604 client.changeWorkingDirectory(parentSrc);
605 boolean renamed = client.rename(from, to);
606 return renamed;
607 }
608
609 @Override
610 public Path getWorkingDirectory() {
611 // Return home directory always since we do not maintain state.
612 return getHomeDirectory();
613 }
614
615 @Override
616 public Path getHomeDirectory() {
617 FTPClient client = null;
618 try {
619 client = connect();
620 Path homeDir = new Path(client.printWorkingDirectory());
621 return homeDir;
622 } catch (IOException ioe) {
623 throw new FTPException("Failed to get home directory", ioe);
624 } finally {
625 try {
626 disconnect(client);
627 } catch (IOException ioe) {
628 throw new FTPException("Failed to disconnect", ioe);
629 }
630 }
631 }
632
633 @Override
634 public void setWorkingDirectory(Path newDir) {
635 // we do not maintain the working directory state
636 }
637 }