001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.ftp;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.ConnectException;
024import java.net.URI;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.commons.net.ftp.FTP;
029import org.apache.commons.net.ftp.FTPClient;
030import org.apache.commons.net.ftp.FTPFile;
031import org.apache.commons.net.ftp.FTPReply;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FSDataInputStream;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileAlreadyExistsException;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.ParentNotDirectoryException;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.permission.FsAction;
043import org.apache.hadoop.fs.permission.FsPermission;
044import org.apache.hadoop.net.NetUtils;
045import org.apache.hadoop.util.Progressable;
046
047/**
048 * <p>
049 * A {@link FileSystem} backed by an FTP client provided by <a
050 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
051 * </p>
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Stable
055public class FTPFileSystem extends FileSystem {
056
057  public static final Log LOG = LogFactory
058      .getLog(FTPFileSystem.class);
059
060  public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
061
062  public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
063  public static final String FS_FTP_USER_PREFIX = "fs.ftp.user.";
064  public static final String FS_FTP_HOST = "fs.ftp.host";
065  public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
066  public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
067  public static final String E_SAME_DIRECTORY_ONLY =
068      "only same directory renames are supported";
069
070  private URI uri;
071
072  /**
073   * Return the protocol scheme for the FileSystem.
074   * <p/>
075   *
076   * @return <code>ftp</code>
077   */
078  @Override
079  public String getScheme() {
080    return "ftp";
081  }
082
083  @Override
084  public void initialize(URI uri, Configuration conf) throws IOException { // get
085    super.initialize(uri, conf);
086    // get host information from uri (overrides info in conf)
087    String host = uri.getHost();
088    host = (host == null) ? conf.get(FS_FTP_HOST, null) : host;
089    if (host == null) {
090      throw new IOException("Invalid host specified");
091    }
092    conf.set(FS_FTP_HOST, host);
093
094    // get port information from uri, (overrides info in conf)
095    int port = uri.getPort();
096    port = (port == -1) ? FTP.DEFAULT_PORT : port;
097    conf.setInt("fs.ftp.host.port", port);
098
099    // get user/password information from URI (overrides info in conf)
100    String userAndPassword = uri.getUserInfo();
101    if (userAndPassword == null) {
102      userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
103          .get("fs.ftp.password." + host, null));
104      if (userAndPassword == null) {
105        throw new IOException("Invalid user/passsword specified");
106      }
107    }
108    String[] userPasswdInfo = userAndPassword.split(":");
109    conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]);
110    if (userPasswdInfo.length > 1) {
111      conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]);
112    } else {
113      conf.set(FS_FTP_PASSWORD_PREFIX + host, null);
114    }
115    setConf(conf);
116    this.uri = uri;
117  }
118
119  /**
120   * Connect to the FTP server using configuration parameters *
121   * 
122   * @return An FTPClient instance
123   * @throws IOException
124   */
125  private FTPClient connect() throws IOException {
126    FTPClient client = null;
127    Configuration conf = getConf();
128    String host = conf.get(FS_FTP_HOST);
129    int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
130    String user = conf.get(FS_FTP_USER_PREFIX + host);
131    String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
132    client = new FTPClient();
133    client.connect(host, port);
134    int reply = client.getReplyCode();
135    if (!FTPReply.isPositiveCompletion(reply)) {
136      throw NetUtils.wrapException(host, port,
137                   NetUtils.UNKNOWN_HOST, 0,
138                   new ConnectException("Server response " + reply));
139    } else if (client.login(user, password)) {
140      client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
141      client.setFileType(FTP.BINARY_FILE_TYPE);
142      client.setBufferSize(DEFAULT_BUFFER_SIZE);
143    } else {
144      throw new IOException("Login failed on server - " + host + ", port - "
145          + port + " as user '" + user + "'");
146    }
147
148    return client;
149  }
150
151  /**
152   * Logout and disconnect the given FTPClient. *
153   * 
154   * @param client
155   * @throws IOException
156   */
157  private void disconnect(FTPClient client) throws IOException {
158    if (client != null) {
159      if (!client.isConnected()) {
160        throw new FTPException("Client not connected");
161      }
162      boolean logoutSuccess = client.logout();
163      client.disconnect();
164      if (!logoutSuccess) {
165        LOG.warn("Logout failed while disconnecting, error code - "
166            + client.getReplyCode());
167      }
168    }
169  }
170
171  /**
172   * Resolve against given working directory. *
173   * 
174   * @param workDir
175   * @param path
176   * @return
177   */
178  private Path makeAbsolute(Path workDir, Path path) {
179    if (path.isAbsolute()) {
180      return path;
181    }
182    return new Path(workDir, path);
183  }
184
185  @Override
186  public FSDataInputStream open(Path file, int bufferSize) throws IOException {
187    FTPClient client = connect();
188    Path workDir = new Path(client.printWorkingDirectory());
189    Path absolute = makeAbsolute(workDir, file);
190    FileStatus fileStat = getFileStatus(client, absolute);
191    if (fileStat.isDirectory()) {
192      disconnect(client);
193      throw new FileNotFoundException("Path " + file + " is a directory.");
194    }
195    client.allocate(bufferSize);
196    Path parent = absolute.getParent();
197    // Change to parent directory on the
198    // server. Only then can we read the
199    // file
200    // on the server by opening up an InputStream. As a side effect the working
201    // directory on the server is changed to the parent directory of the file.
202    // The FTP client connection is closed when close() is called on the
203    // FSDataInputStream.
204    client.changeWorkingDirectory(parent.toUri().getPath());
205    InputStream is = client.retrieveFileStream(file.getName());
206    FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
207        client, statistics));
208    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
209      // The ftpClient is an inconsistent state. Must close the stream
210      // which in turn will logout and disconnect from FTP server
211      fis.close();
212      throw new IOException("Unable to open file: " + file + ", Aborting");
213    }
214    return fis;
215  }
216
217  /**
218   * A stream obtained via this call must be closed before using other APIs of
219   * this class or else the invocation will block.
220   */
221  @Override
222  public FSDataOutputStream create(Path file, FsPermission permission,
223      boolean overwrite, int bufferSize, short replication, long blockSize,
224      Progressable progress) throws IOException {
225    final FTPClient client = connect();
226    Path workDir = new Path(client.printWorkingDirectory());
227    Path absolute = makeAbsolute(workDir, file);
228    FileStatus status;
229    try {
230      status = getFileStatus(client, file);
231    } catch (FileNotFoundException fnfe) {
232      status = null;
233    }
234    if (status != null) {
235      if (overwrite && !status.isDirectory()) {
236        delete(client, file, false);
237      } else {
238        disconnect(client);
239        throw new FileAlreadyExistsException("File already exists: " + file);
240      }
241    }
242    
243    Path parent = absolute.getParent();
244    if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
245      parent = (parent == null) ? new Path("/") : parent;
246      disconnect(client);
247      throw new IOException("create(): Mkdirs failed to create: " + parent);
248    }
249    client.allocate(bufferSize);
250    // Change to parent directory on the server. Only then can we write to the
251    // file on the server by opening up an OutputStream. As a side effect the
252    // working directory on the server is changed to the parent directory of the
253    // file. The FTP client connection is closed when close() is called on the
254    // FSDataOutputStream.
255    client.changeWorkingDirectory(parent.toUri().getPath());
256    FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
257        .getName()), statistics) {
258      @Override
259      public void close() throws IOException {
260        super.close();
261        if (!client.isConnected()) {
262          throw new FTPException("Client not connected");
263        }
264        boolean cmdCompleted = client.completePendingCommand();
265        disconnect(client);
266        if (!cmdCompleted) {
267          throw new FTPException("Could not complete transfer, Reply Code - "
268              + client.getReplyCode());
269        }
270      }
271    };
272    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
273      // The ftpClient is an inconsistent state. Must close the stream
274      // which in turn will logout and disconnect from FTP server
275      fos.close();
276      throw new IOException("Unable to create file: " + file + ", Aborting");
277    }
278    return fos;
279  }
280
281  /** This optional operation is not yet supported. */
282  @Override
283  public FSDataOutputStream append(Path f, int bufferSize,
284      Progressable progress) throws IOException {
285    throw new IOException("Not supported");
286  }
287  
288  /**
289   * Convenience method, so that we don't open a new connection when using this
290   * method from within another method. Otherwise every API invocation incurs
291   * the overhead of opening/closing a TCP connection.
292   * @throws IOException on IO problems other than FileNotFoundException
293   */
294  private boolean exists(FTPClient client, Path file) throws IOException {
295    try {
296      return getFileStatus(client, file) != null;
297    } catch (FileNotFoundException fnfe) {
298      return false;
299    }
300  }
301
302  @Override
303  public boolean delete(Path file, boolean recursive) throws IOException {
304    FTPClient client = connect();
305    try {
306      boolean success = delete(client, file, recursive);
307      return success;
308    } finally {
309      disconnect(client);
310    }
311  }
312
313  /**
314   * Convenience method, so that we don't open a new connection when using this
315   * method from within another method. Otherwise every API invocation incurs
316   * the overhead of opening/closing a TCP connection.
317   */
318  private boolean delete(FTPClient client, Path file, boolean recursive)
319      throws IOException {
320    Path workDir = new Path(client.printWorkingDirectory());
321    Path absolute = makeAbsolute(workDir, file);
322    String pathName = absolute.toUri().getPath();
323    try {
324      FileStatus fileStat = getFileStatus(client, absolute);
325      if (fileStat.isFile()) {
326        return client.deleteFile(pathName);
327      }
328    } catch (FileNotFoundException e) {
329      //the file is not there
330      return false;
331    }
332    FileStatus[] dirEntries = listStatus(client, absolute);
333    if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
334      throw new IOException("Directory: " + file + " is not empty.");
335    }
336    if (dirEntries != null) {
337      for (int i = 0; i < dirEntries.length; i++) {
338        delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
339      }
340    }
341    return client.removeDirectory(pathName);
342  }
343
344  private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
345    FsAction action = FsAction.NONE;
346    if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
347      action.or(FsAction.READ);
348    }
349    if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
350      action.or(FsAction.WRITE);
351    }
352    if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
353      action.or(FsAction.EXECUTE);
354    }
355    return action;
356  }
357
358  private FsPermission getPermissions(FTPFile ftpFile) {
359    FsAction user, group, others;
360    user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
361    group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
362    others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
363    return new FsPermission(user, group, others);
364  }
365
366  @Override
367  public URI getUri() {
368    return uri;
369  }
370
371  @Override
372  public FileStatus[] listStatus(Path file) throws IOException {
373    FTPClient client = connect();
374    try {
375      FileStatus[] stats = listStatus(client, file);
376      return stats;
377    } finally {
378      disconnect(client);
379    }
380  }
381
382  /**
383   * Convenience method, so that we don't open a new connection when using this
384   * method from within another method. Otherwise every API invocation incurs
385   * the overhead of opening/closing a TCP connection.
386   */
387  private FileStatus[] listStatus(FTPClient client, Path file)
388      throws IOException {
389    Path workDir = new Path(client.printWorkingDirectory());
390    Path absolute = makeAbsolute(workDir, file);
391    FileStatus fileStat = getFileStatus(client, absolute);
392    if (fileStat.isFile()) {
393      return new FileStatus[] { fileStat };
394    }
395    FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
396    FileStatus[] fileStats = new FileStatus[ftpFiles.length];
397    for (int i = 0; i < ftpFiles.length; i++) {
398      fileStats[i] = getFileStatus(ftpFiles[i], absolute);
399    }
400    return fileStats;
401  }
402
403  @Override
404  public FileStatus getFileStatus(Path file) throws IOException {
405    FTPClient client = connect();
406    try {
407      FileStatus status = getFileStatus(client, file);
408      return status;
409    } finally {
410      disconnect(client);
411    }
412  }
413
414  /**
415   * Convenience method, so that we don't open a new connection when using this
416   * method from within another method. Otherwise every API invocation incurs
417   * the overhead of opening/closing a TCP connection.
418   */
419  private FileStatus getFileStatus(FTPClient client, Path file)
420      throws IOException {
421    FileStatus fileStat = null;
422    Path workDir = new Path(client.printWorkingDirectory());
423    Path absolute = makeAbsolute(workDir, file);
424    Path parentPath = absolute.getParent();
425    if (parentPath == null) { // root dir
426      long length = -1; // Length of root dir on server not known
427      boolean isDir = true;
428      int blockReplication = 1;
429      long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
430      long modTime = -1; // Modification time of root dir not known.
431      Path root = new Path("/");
432      return new FileStatus(length, isDir, blockReplication, blockSize,
433          modTime, root.makeQualified(this));
434    }
435    String pathName = parentPath.toUri().getPath();
436    FTPFile[] ftpFiles = client.listFiles(pathName);
437    if (ftpFiles != null) {
438      for (FTPFile ftpFile : ftpFiles) {
439        if (ftpFile.getName().equals(file.getName())) { // file found in dir
440          fileStat = getFileStatus(ftpFile, parentPath);
441          break;
442        }
443      }
444      if (fileStat == null) {
445        throw new FileNotFoundException("File " + file + " does not exist.");
446      }
447    } else {
448      throw new FileNotFoundException("File " + file + " does not exist.");
449    }
450    return fileStat;
451  }
452
453  /**
454   * Convert the file information in FTPFile to a {@link FileStatus} object. *
455   * 
456   * @param ftpFile
457   * @param parentPath
458   * @return FileStatus
459   */
460  private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
461    long length = ftpFile.getSize();
462    boolean isDir = ftpFile.isDirectory();
463    int blockReplication = 1;
464    // Using default block size since there is no way in FTP client to know of
465    // block sizes on server. The assumption could be less than ideal.
466    long blockSize = DEFAULT_BLOCK_SIZE;
467    long modTime = ftpFile.getTimestamp().getTimeInMillis();
468    long accessTime = 0;
469    FsPermission permission = getPermissions(ftpFile);
470    String user = ftpFile.getUser();
471    String group = ftpFile.getGroup();
472    Path filePath = new Path(parentPath, ftpFile.getName());
473    return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
474        accessTime, permission, user, group, filePath.makeQualified(this));
475  }
476
477  @Override
478  public boolean mkdirs(Path file, FsPermission permission) throws IOException {
479    FTPClient client = connect();
480    try {
481      boolean success = mkdirs(client, file, permission);
482      return success;
483    } finally {
484      disconnect(client);
485    }
486  }
487
488  /**
489   * Convenience method, so that we don't open a new connection when using this
490   * method from within another method. Otherwise every API invocation incurs
491   * the overhead of opening/closing a TCP connection.
492   */
493  private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
494      throws IOException {
495    boolean created = true;
496    Path workDir = new Path(client.printWorkingDirectory());
497    Path absolute = makeAbsolute(workDir, file);
498    String pathName = absolute.getName();
499    if (!exists(client, absolute)) {
500      Path parent = absolute.getParent();
501      created = (parent == null || mkdirs(client, parent, FsPermission
502          .getDirDefault()));
503      if (created) {
504        String parentDir = parent.toUri().getPath();
505        client.changeWorkingDirectory(parentDir);
506        created = created && client.makeDirectory(pathName);
507      }
508    } else if (isFile(client, absolute)) {
509      throw new ParentNotDirectoryException(String.format(
510          "Can't make directory for path %s since it is a file.", absolute));
511    }
512    return created;
513  }
514
515  /**
516   * Convenience method, so that we don't open a new connection when using this
517   * method from within another method. Otherwise every API invocation incurs
518   * the overhead of opening/closing a TCP connection.
519   */
520  private boolean isFile(FTPClient client, Path file) {
521    try {
522      return getFileStatus(client, file).isFile();
523    } catch (FileNotFoundException e) {
524      return false; // file does not exist
525    } catch (IOException ioe) {
526      throw new FTPException("File check failed", ioe);
527    }
528  }
529
530  /*
531   * Assuming that parent of both source and destination is the same. Is the
532   * assumption correct or it is suppose to work like 'move' ?
533   */
534  @Override
535  public boolean rename(Path src, Path dst) throws IOException {
536    FTPClient client = connect();
537    try {
538      boolean success = rename(client, src, dst);
539      return success;
540    } finally {
541      disconnect(client);
542    }
543  }
544
545  /**
546   * Probe for a path being a parent of another
547   * @param parent parent path
548   * @param child possible child path
549   * @return true if the parent's path matches the start of the child's
550   */
551  private boolean isParentOf(Path parent, Path child) {
552    URI parentURI = parent.toUri();
553    String parentPath = parentURI.getPath();
554    if (!parentPath.endsWith("/")) {
555      parentPath += "/";
556    }
557    URI childURI = child.toUri();
558    String childPath = childURI.getPath();
559    return childPath.startsWith(parentPath);
560  }
561
562  /**
563   * Convenience method, so that we don't open a new connection when using this
564   * method from within another method. Otherwise every API invocation incurs
565   * the overhead of opening/closing a TCP connection.
566   * 
567   * @param client
568   * @param src
569   * @param dst
570   * @return
571   * @throws IOException
572   */
573  private boolean rename(FTPClient client, Path src, Path dst)
574      throws IOException {
575    Path workDir = new Path(client.printWorkingDirectory());
576    Path absoluteSrc = makeAbsolute(workDir, src);
577    Path absoluteDst = makeAbsolute(workDir, dst);
578    if (!exists(client, absoluteSrc)) {
579      throw new FileNotFoundException("Source path " + src + " does not exist");
580    }
581    if (isDirectory(absoluteDst)) {
582      // destination is a directory: rename goes underneath it with the
583      // source name
584      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
585    }
586    if (exists(client, absoluteDst)) {
587      throw new FileAlreadyExistsException("Destination path " + dst
588          + " already exists");
589    }
590    String parentSrc = absoluteSrc.getParent().toUri().toString();
591    String parentDst = absoluteDst.getParent().toUri().toString();
592    if (isParentOf(absoluteSrc, absoluteDst)) {
593      throw new IOException("Cannot rename " + absoluteSrc + " under itself"
594      + " : "+ absoluteDst);
595    }
596
597    if (!parentSrc.equals(parentDst)) {
598      throw new IOException("Cannot rename source: " + absoluteSrc
599          + " to " + absoluteDst
600          + " -"+ E_SAME_DIRECTORY_ONLY);
601    }
602    String from = absoluteSrc.getName();
603    String to = absoluteDst.getName();
604    client.changeWorkingDirectory(parentSrc);
605    boolean renamed = client.rename(from, to);
606    return renamed;
607  }
608
609  @Override
610  public Path getWorkingDirectory() {
611    // Return home directory always since we do not maintain state.
612    return getHomeDirectory();
613  }
614
615  @Override
616  public Path getHomeDirectory() {
617    FTPClient client = null;
618    try {
619      client = connect();
620      Path homeDir = new Path(client.printWorkingDirectory());
621      return homeDir;
622    } catch (IOException ioe) {
623      throw new FTPException("Failed to get home directory", ioe);
624    } finally {
625      try {
626        disconnect(client);
627      } catch (IOException ioe) {
628        throw new FTPException("Failed to disconnect", ioe);
629      }
630    }
631  }
632
633  @Override
634  public void setWorkingDirectory(Path newDir) {
635    // we do not maintain the working directory state
636  }
637}