018package org.apache.hadoop.fs.ftp;
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.URI;
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.commons.net.ftp.FTP;
028import org.apache.commons.net.ftp.FTPClient;
029import org.apache.commons.net.ftp.FTPFile;
030import org.apache.commons.net.ftp.FTPReply;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsAction;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.util.Progressable;
044 * <p>
045 * A {@link FileSystem} backed by an FTP client provided by <a
046 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
047 * </p>
048 */
051public class FTPFileSystem extends FileSystem {
053  public static final Log LOG = LogFactory
054      .getLog(FTPFileSystem.class);
056  public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
058  public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
060  private URI uri;
062  @Override
063  public void initialize(URI uri, Configuration conf) throws IOException { // get
064    super.initialize(uri, conf);
065    // get host information from uri (overrides info in conf)
066    String host = uri.getHost();
067    host = (host == null) ? conf.get("fs.ftp.host", null) : host;
068    if (host == null) {
069      throw new IOException("Invalid host specified");
070    }
071    conf.set("fs.ftp.host", host);
073    // get port information from uri, (overrides info in conf)
074    int port = uri.getPort();
075    port = (port == -1) ? FTP.DEFAULT_PORT : port;
076    conf.setInt("fs.ftp.host.port", port);
078    // get user/password information from URI (overrides info in conf)
079    String userAndPassword = uri.getUserInfo();
080    if (userAndPassword == null) {
081      userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
082          .get("fs.ftp.password." + host, null));
083      if (userAndPassword == null) {
084        throw new IOException("Invalid user/passsword specified");
085      }
086    }
087    String[] userPasswdInfo = userAndPassword.split(":");
088    conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
089    if (userPasswdInfo.length > 1) {
090      conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
091    } else {
092      conf.set("fs.ftp.password." + host, null);
093    }
094    setConf(conf);
095    this.uri = uri;
096  }
098  /**
099   * Connect to the FTP server using configuration parameters *
100   * 
101   * @return An FTPClient instance
102   * @throws IOException
103   */
104  private FTPClient connect() throws IOException {
105    FTPClient client = null;
106    Configuration conf = getConf();
107    String host = conf.get("fs.ftp.host");
108    int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
109    String user = conf.get("fs.ftp.user." + host);
110    String password = conf.get("fs.ftp.password." + host);
111    client = new FTPClient();
112    client.connect(host, port);
113    int reply = client.getReplyCode();
114    if (!FTPReply.isPositiveCompletion(reply)) {
115      throw new IOException("Server - " + host
116          + " refused connection on port - " + port);
117    } else if (client.login(user, password)) {
118      client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
119      client.setFileType(FTP.BINARY_FILE_TYPE);
120      client.setBufferSize(DEFAULT_BUFFER_SIZE);
121    } else {
122      throw new IOException("Login failed on server - " + host + ", port - "
123          + port);
124    }
126    return client;
127  }
129  /**
130   * Logout and disconnect the given FTPClient. *
131   * 
132   * @param client
133   * @throws IOException
134   */
135  private void disconnect(FTPClient client) throws IOException {
136    if (client != null) {
137      if (!client.isConnected()) {
138        throw new FTPException("Client not connected");
139      }
140      boolean logoutSuccess = client.logout();
141      client.disconnect();
142      if (!logoutSuccess) {
143        LOG.warn("Logout failed while disconnecting, error code - "
144            + client.getReplyCode());
145      }
146    }
147  }
149  /**
150   * Resolve against given working directory. *
151   * 
152   * @param workDir
153   * @param path
154   * @return
155   */
156  private Path makeAbsolute(Path workDir, Path path) {
157    if (path.isAbsolute()) {
158      return path;
159    }
160    return new Path(workDir, path);
161  }
163  @Override
164  public FSDataInputStream open(Path file, int bufferSize) throws IOException {
165    FTPClient client = connect();
166    Path workDir = new Path(client.printWorkingDirectory());
167    Path absolute = makeAbsolute(workDir, file);
168    FileStatus fileStat = getFileStatus(client, absolute);
169    if (fileStat.isDirectory()) {
170      disconnect(client);
171      throw new IOException("Path " + file + " is a directory.");
172    }
173    client.allocate(bufferSize);
174    Path parent = absolute.getParent();
175    // Change to parent directory on the
176    // server. Only then can we read the
177    // file
178    // on the server by opening up an InputStream. As a side effect the working
179    // directory on the server is changed to the parent directory of the file.
180    // The FTP client connection is closed when close() is called on the
181    // FSDataInputStream.
182    client.changeWorkingDirectory(parent.toUri().getPath());
183    InputStream is = client.retrieveFileStream(file.getName());
184    FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
185        client, statistics));
186    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
187      // The ftpClient is an inconsistent state. Must close the stream
188      // which in turn will logout and disconnect from FTP server
189      fis.close();
190      throw new IOException("Unable to open file: " + file + ", Aborting");
191    }
192    return fis;
193  }
195  /**
196   * A stream obtained via this call must be closed before using other APIs of
197   * this class or else the invocation will block.
198   */
199  @Override
200  public FSDataOutputStream create(Path file, FsPermission permission,
201      boolean overwrite, int bufferSize, short replication, long blockSize,
202      Progressable progress) throws IOException {
203    final FTPClient client = connect();
204    Path workDir = new Path(client.printWorkingDirectory());
205    Path absolute = makeAbsolute(workDir, file);
206    if (exists(client, file)) {
207      if (overwrite) {
208        delete(client, file);
209      } else {
210        disconnect(client);
211        throw new IOException("File already exists: " + file);
212      }
213    }
215    Path parent = absolute.getParent();
216    if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
217      parent = (parent == null) ? new Path("/") : parent;
218      disconnect(client);
219      throw new IOException("create(): Mkdirs failed to create: " + parent);
220    }
221    client.allocate(bufferSize);
222    // Change to parent directory on the server. Only then can we write to the
223    // file on the server by opening up an OutputStream. As a side effect the
224    // working directory on the server is changed to the parent directory of the
225    // file. The FTP client connection is closed when close() is called on the
226    // FSDataOutputStream.
227    client.changeWorkingDirectory(parent.toUri().getPath());
228    FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
229        .getName()), statistics) {
230      @Override
231      public void close() throws IOException {
232        super.close();
233        if (!client.isConnected()) {
234          throw new FTPException("Client not connected");
235        }
236        boolean cmdCompleted = client.completePendingCommand();
237        disconnect(client);
238        if (!cmdCompleted) {
239          throw new FTPException("Could not complete transfer, Reply Code - "
240              + client.getReplyCode());
241        }
242      }
243    };
244    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
245      // The ftpClient is an inconsistent state. Must close the stream
246      // which in turn will logout and disconnect from FTP server
247      fos.close();
248      throw new IOException("Unable to create file: " + file + ", Aborting");
249    }
250    return fos;
251  }
253  /** This optional operation is not yet supported. */
254  public FSDataOutputStream append(Path f, int bufferSize,
255      Progressable progress) throws IOException {
256    throw new IOException("Not supported");
257  }
259  /**
260   * Convenience method, so that we don't open a new connection when using this
261   * method from within another method. Otherwise every API invocation incurs
262   * the overhead of opening/closing a TCP connection.
263   */
264  private boolean exists(FTPClient client, Path file) {
265    try {
266      return getFileStatus(client, file) != null;
267    } catch (FileNotFoundException fnfe) {
268      return false;
269    } catch (IOException ioe) {
270      throw new FTPException("Failed to get file status", ioe);
271    }
272  }
274  @Override
275  public boolean delete(Path file, boolean recursive) throws IOException {
276    FTPClient client = connect();
277    try {
278      boolean success = delete(client, file, recursive);
279      return success;
280    } finally {
281      disconnect(client);
282    }
283  }
285  /** @deprecated Use delete(Path, boolean) instead */
286  @Deprecated
287  private boolean delete(FTPClient client, Path file) throws IOException {
288    return delete(client, file, false);
289  }
291  /**
292   * Convenience method, so that we don't open a new connection when using this
293   * method from within another method. Otherwise every API invocation incurs
294   * the overhead of opening/closing a TCP connection.
295   */
296  private boolean delete(FTPClient client, Path file, boolean recursive)
297      throws IOException {
298    Path workDir = new Path(client.printWorkingDirectory());
299    Path absolute = makeAbsolute(workDir, file);
300    String pathName = absolute.toUri().getPath();
301    FileStatus fileStat = getFileStatus(client, absolute);
302    if (fileStat.isFile()) {
303      return client.deleteFile(pathName);
304    }
305    FileStatus[] dirEntries = listStatus(client, absolute);
306    if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
307      throw new IOException("Directory: " + file + " is not empty.");
308    }
309    if (dirEntries != null) {
310      for (int i = 0; i < dirEntries.length; i++) {
311        delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
312      }
313    }
314    return client.removeDirectory(pathName);
315  }
317  private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
318    FsAction action = FsAction.NONE;
319    if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
320      action.or(FsAction.READ);
321    }
322    if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
323      action.or(FsAction.WRITE);
324    }
325    if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
326      action.or(FsAction.EXECUTE);
327    }
328    return action;
329  }
331  private FsPermission getPermissions(FTPFile ftpFile) {
332    FsAction user, group, others;
333    user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
334    group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
335    others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
336    return new FsPermission(user, group, others);
337  }
339  @Override
340  public URI getUri() {
341    return uri;
342  }
344  @Override
345  public FileStatus[] listStatus(Path file) throws IOException {
346    FTPClient client = connect();
347    try {
348      FileStatus[] stats = listStatus(client, file);
349      return stats;
350    } finally {
351      disconnect(client);
352    }
353  }
355  /**
356   * Convenience method, so that we don't open a new connection when using this
357   * method from within another method. Otherwise every API invocation incurs
358   * the overhead of opening/closing a TCP connection.
359   */
360  private FileStatus[] listStatus(FTPClient client, Path file)
361      throws IOException {
362    Path workDir = new Path(client.printWorkingDirectory());
363    Path absolute = makeAbsolute(workDir, file);
364    FileStatus fileStat = getFileStatus(client, absolute);
365    if (fileStat.isFile()) {
366      return new FileStatus[] { fileStat };
367    }
368    FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
369    FileStatus[] fileStats = new FileStatus[ftpFiles.length];
370    for (int i = 0; i < ftpFiles.length; i++) {
371      fileStats[i] = getFileStatus(ftpFiles[i], absolute);
372    }
373    return fileStats;
374  }
376  @Override
377  public FileStatus getFileStatus(Path file) throws IOException {
378    FTPClient client = connect();
379    try {
380      FileStatus status = getFileStatus(client, file);
381      return status;
382    } finally {
383      disconnect(client);
384    }
385  }
387  /**
388   * Convenience method, so that we don't open a new connection when using this
389   * method from within another method. Otherwise every API invocation incurs
390   * the overhead of opening/closing a TCP connection.
391   */
392  private FileStatus getFileStatus(FTPClient client, Path file)
393      throws IOException {
394    FileStatus fileStat = null;
395    Path workDir = new Path(client.printWorkingDirectory());
396    Path absolute = makeAbsolute(workDir, file);
397    Path parentPath = absolute.getParent();
398    if (parentPath == null) { // root dir
399      long length = -1; // Length of root dir on server not known
400      boolean isDir = true;
401      int blockReplication = 1;
402      long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
403      long modTime = -1; // Modification time of root dir not known.
404      Path root = new Path("/");
405      return new FileStatus(length, isDir, blockReplication, blockSize,
406          modTime, root.makeQualified(this));
407    }
408    String pathName = parentPath.toUri().getPath();
409    FTPFile[] ftpFiles = client.listFiles(pathName);
410    if (ftpFiles != null) {
411      for (FTPFile ftpFile : ftpFiles) {
412        if (ftpFile.getName().equals(file.getName())) { // file found in dir
413          fileStat = getFileStatus(ftpFile, parentPath);
414          break;
415        }
416      }
417      if (fileStat == null) {
418        throw new FileNotFoundException("File " + file + " does not exist.");
419      }
420    } else {
421      throw new FileNotFoundException("File " + file + " does not exist.");
422    }
423    return fileStat;
424  }
426  /**
427   * Convert the file information in FTPFile to a {@link FileStatus} object. *
428   * 
429   * @param ftpFile
430   * @param parentPath
431   * @return FileStatus
432   */
433  private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
434    long length = ftpFile.getSize();
435    boolean isDir = ftpFile.isDirectory();
436    int blockReplication = 1;
437    // Using default block size since there is no way in FTP client to know of
438    // block sizes on server. The assumption could be less than ideal.
439    long blockSize = DEFAULT_BLOCK_SIZE;
440    long modTime = ftpFile.getTimestamp().getTimeInMillis();
441    long accessTime = 0;
442    FsPermission permission = getPermissions(ftpFile);
443    String user = ftpFile.getUser();
444    String group = ftpFile.getGroup();
445    Path filePath = new Path(parentPath, ftpFile.getName());
446    return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
447        accessTime, permission, user, group, filePath.makeQualified(this));
448  }
450  @Override
451  public boolean mkdirs(Path file, FsPermission permission) throws IOException {
452    FTPClient client = connect();
453    try {
454      boolean success = mkdirs(client, file, permission);
455      return success;
456    } finally {
457      disconnect(client);
458    }
459  }
461  /**
462   * Convenience method, so that we don't open a new connection when using this
463   * method from within another method. Otherwise every API invocation incurs
464   * the overhead of opening/closing a TCP connection.
465   */
466  private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
467      throws IOException {
468    boolean created = true;
469    Path workDir = new Path(client.printWorkingDirectory());
470    Path absolute = makeAbsolute(workDir, file);
471    String pathName = absolute.getName();
472    if (!exists(client, absolute)) {
473      Path parent = absolute.getParent();
474      created = (parent == null || mkdirs(client, parent, FsPermission
475          .getDirDefault()));
476      if (created) {
477        String parentDir = parent.toUri().getPath();
478        client.changeWorkingDirectory(parentDir);
479        created = created && client.makeDirectory(pathName);
480      }
481    } else if (isFile(client, absolute)) {
482      throw new IOException(String.format(
483          "Can't make directory for path %s since it is a file.", absolute));
484    }
485    return created;
486  }
488  /**
489   * Convenience method, so that we don't open a new connection when using this
490   * method from within another method. Otherwise every API invocation incurs
491   * the overhead of opening/closing a TCP connection.
492   */
493  private boolean isFile(FTPClient client, Path file) {
494    try {
495      return getFileStatus(client, file).isFile();
496    } catch (FileNotFoundException e) {
497      return false; // file does not exist
498    } catch (IOException ioe) {
499      throw new FTPException("File check failed", ioe);
500    }
501  }
503  /*
504   * Assuming that parent of both source and destination is the same. Is the
505   * assumption correct or it is suppose to work like 'move' ?
506   */
507  @Override
508  public boolean rename(Path src, Path dst) throws IOException {
509    FTPClient client = connect();
510    try {
511      boolean success = rename(client, src, dst);
512      return success;
513    } finally {
514      disconnect(client);
515    }
516  }
518  /**
519   * Convenience method, so that we don't open a new connection when using this
520   * method from within another method. Otherwise every API invocation incurs
521   * the overhead of opening/closing a TCP connection.
522   * 
523   * @param client
524   * @param src
525   * @param dst
526   * @return
527   * @throws IOException
528   */
529  private boolean rename(FTPClient client, Path src, Path dst)
530      throws IOException {
531    Path workDir = new Path(client.printWorkingDirectory());
532    Path absoluteSrc = makeAbsolute(workDir, src);
533    Path absoluteDst = makeAbsolute(workDir, dst);
534    if (!exists(client, absoluteSrc)) {
535      throw new IOException("Source path " + src + " does not exist");
536    }
537    if (exists(client, absoluteDst)) {
538      throw new IOException("Destination path " + dst
539          + " already exist, cannot rename!");
540    }
541    String parentSrc = absoluteSrc.getParent().toUri().toString();
542    String parentDst = absoluteDst.getParent().toUri().toString();
543    String from = src.getName();
544    String to = dst.getName();
545    if (!parentSrc.equals(parentDst)) {
546      throw new IOException("Cannot rename parent(source): " + parentSrc
547          + ", parent(destination):  " + parentDst);
548    }
549    client.changeWorkingDirectory(parentSrc);
550    boolean renamed = client.rename(from, to);
551    return renamed;
552  }
554  @Override
555  public Path getWorkingDirectory() {
556    // Return home directory always since we do not maintain state.
557    return getHomeDirectory();
558  }
560  @Override
561  public Path getHomeDirectory() {
562    FTPClient client = null;
563    try {
564      client = connect();
565      Path homeDir = new Path(client.printWorkingDirectory());
566      return homeDir;
567    } catch (IOException ioe) {
568      throw new FTPException("Failed to get home directory", ioe);
569    } finally {
570      try {
571        disconnect(client);
572      } catch (IOException ioe) {
573        throw new FTPException("Failed to disconnect", ioe);
574      }
575    }
576  }
578  @Override
579  public void setWorkingDirectory(Path newDir) {
580    // we do not maintain the working directory state
581  }