001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.ftp;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.net.ConnectException;
024import java.net.URI;
025
026import com.google.common.base.Preconditions;
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.apache.commons.net.ftp.FTP;
030import org.apache.commons.net.ftp.FTPClient;
031import org.apache.commons.net.ftp.FTPFile;
032import org.apache.commons.net.ftp.FTPReply;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataInputStream;
037import org.apache.hadoop.fs.FSDataOutputStream;
038import org.apache.hadoop.fs.FileAlreadyExistsException;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.ParentNotDirectoryException;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.permission.FsAction;
044import org.apache.hadoop.fs.permission.FsPermission;
045import org.apache.hadoop.net.NetUtils;
046import org.apache.hadoop.util.Progressable;
047
048/**
049 * <p>
050 * A {@link FileSystem} backed by an FTP client provided by <a
051 * href="http://commons.apache.org/net/">Apache Commons Net</a>.
052 * </p>
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Stable
056public class FTPFileSystem extends FileSystem {
057
058  public static final Log LOG = LogFactory
059      .getLog(FTPFileSystem.class);
060
061  public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
062
063  public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
064  public static final String FS_FTP_USER_PREFIX = "fs.ftp.user.";
065  public static final String FS_FTP_HOST = "fs.ftp.host";
066  public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
067  public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
068  public static final String E_SAME_DIRECTORY_ONLY =
069      "only same directory renames are supported";
070
071  private URI uri;
072
073  /**
074   * Return the protocol scheme for the FileSystem.
075   * <p/>
076   *
077   * @return <code>ftp</code>
078   */
079  @Override
080  public String getScheme() {
081    return "ftp";
082  }
083
084  /**
085   * Get the default port for this FTPFileSystem.
086   *
087   * @return the default port
088   */
089  @Override
090  protected int getDefaultPort() {
091    return FTP.DEFAULT_PORT;
092  }
093
094  @Override
095  public void initialize(URI uri, Configuration conf) throws IOException { // get
096    super.initialize(uri, conf);
097    // get host information from uri (overrides info in conf)
098    String host = uri.getHost();
099    host = (host == null) ? conf.get(FS_FTP_HOST, null) : host;
100    if (host == null) {
101      throw new IOException("Invalid host specified");
102    }
103    conf.set(FS_FTP_HOST, host);
104
105    // get port information from uri, (overrides info in conf)
106    int port = uri.getPort();
107    port = (port == -1) ? FTP.DEFAULT_PORT : port;
108    conf.setInt(FS_FTP_HOST_PORT, port);
109
110    // get user/password information from URI (overrides info in conf)
111    String userAndPassword = uri.getUserInfo();
112    if (userAndPassword == null) {
113      userAndPassword = (conf.get(FS_FTP_USER_PREFIX + host, null) + ":" + conf
114          .get(FS_FTP_PASSWORD_PREFIX + host, null));
115    }
116    String[] userPasswdInfo = userAndPassword.split(":");
117    Preconditions.checkState(userPasswdInfo.length > 1,
118                             "Invalid username / password");
119    conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]);
120    conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]);
121    setConf(conf);
122    this.uri = uri;
123  }
124
125  /**
126   * Connect to the FTP server using configuration parameters *
127   * 
128   * @return An FTPClient instance
129   * @throws IOException
130   */
131  private FTPClient connect() throws IOException {
132    FTPClient client = null;
133    Configuration conf = getConf();
134    String host = conf.get(FS_FTP_HOST);
135    int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
136    String user = conf.get(FS_FTP_USER_PREFIX + host);
137    String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
138    client = new FTPClient();
139    client.connect(host, port);
140    int reply = client.getReplyCode();
141    if (!FTPReply.isPositiveCompletion(reply)) {
142      throw NetUtils.wrapException(host, port,
143                   NetUtils.UNKNOWN_HOST, 0,
144                   new ConnectException("Server response " + reply));
145    } else if (client.login(user, password)) {
146      client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
147      client.setFileType(FTP.BINARY_FILE_TYPE);
148      client.setBufferSize(DEFAULT_BUFFER_SIZE);
149    } else {
150      throw new IOException("Login failed on server - " + host + ", port - "
151          + port + " as user '" + user + "'");
152    }
153
154    return client;
155  }
156
157  /**
158   * Logout and disconnect the given FTPClient. *
159   * 
160   * @param client
161   * @throws IOException
162   */
163  private void disconnect(FTPClient client) throws IOException {
164    if (client != null) {
165      if (!client.isConnected()) {
166        throw new FTPException("Client not connected");
167      }
168      boolean logoutSuccess = client.logout();
169      client.disconnect();
170      if (!logoutSuccess) {
171        LOG.warn("Logout failed while disconnecting, error code - "
172            + client.getReplyCode());
173      }
174    }
175  }
176
177  /**
178   * Resolve against given working directory. *
179   * 
180   * @param workDir
181   * @param path
182   * @return
183   */
184  private Path makeAbsolute(Path workDir, Path path) {
185    if (path.isAbsolute()) {
186      return path;
187    }
188    return new Path(workDir, path);
189  }
190
191  @Override
192  public FSDataInputStream open(Path file, int bufferSize) throws IOException {
193    FTPClient client = connect();
194    Path workDir = new Path(client.printWorkingDirectory());
195    Path absolute = makeAbsolute(workDir, file);
196    FileStatus fileStat = getFileStatus(client, absolute);
197    if (fileStat.isDirectory()) {
198      disconnect(client);
199      throw new FileNotFoundException("Path " + file + " is a directory.");
200    }
201    client.allocate(bufferSize);
202    Path parent = absolute.getParent();
203    // Change to parent directory on the
204    // server. Only then can we read the
205    // file
206    // on the server by opening up an InputStream. As a side effect the working
207    // directory on the server is changed to the parent directory of the file.
208    // The FTP client connection is closed when close() is called on the
209    // FSDataInputStream.
210    client.changeWorkingDirectory(parent.toUri().getPath());
211    InputStream is = client.retrieveFileStream(file.getName());
212    FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
213        client, statistics));
214    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
215      // The ftpClient is an inconsistent state. Must close the stream
216      // which in turn will logout and disconnect from FTP server
217      fis.close();
218      throw new IOException("Unable to open file: " + file + ", Aborting");
219    }
220    return fis;
221  }
222
223  /**
224   * A stream obtained via this call must be closed before using other APIs of
225   * this class or else the invocation will block.
226   */
227  @Override
228  public FSDataOutputStream create(Path file, FsPermission permission,
229      boolean overwrite, int bufferSize, short replication, long blockSize,
230      Progressable progress) throws IOException {
231    final FTPClient client = connect();
232    Path workDir = new Path(client.printWorkingDirectory());
233    Path absolute = makeAbsolute(workDir, file);
234    FileStatus status;
235    try {
236      status = getFileStatus(client, file);
237    } catch (FileNotFoundException fnfe) {
238      status = null;
239    }
240    if (status != null) {
241      if (overwrite && !status.isDirectory()) {
242        delete(client, file, false);
243      } else {
244        disconnect(client);
245        throw new FileAlreadyExistsException("File already exists: " + file);
246      }
247    }
248    
249    Path parent = absolute.getParent();
250    if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
251      parent = (parent == null) ? new Path("/") : parent;
252      disconnect(client);
253      throw new IOException("create(): Mkdirs failed to create: " + parent);
254    }
255    client.allocate(bufferSize);
256    // Change to parent directory on the server. Only then can we write to the
257    // file on the server by opening up an OutputStream. As a side effect the
258    // working directory on the server is changed to the parent directory of the
259    // file. The FTP client connection is closed when close() is called on the
260    // FSDataOutputStream.
261    client.changeWorkingDirectory(parent.toUri().getPath());
262    FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
263        .getName()), statistics) {
264      @Override
265      public void close() throws IOException {
266        super.close();
267        if (!client.isConnected()) {
268          throw new FTPException("Client not connected");
269        }
270        boolean cmdCompleted = client.completePendingCommand();
271        disconnect(client);
272        if (!cmdCompleted) {
273          throw new FTPException("Could not complete transfer, Reply Code - "
274              + client.getReplyCode());
275        }
276      }
277    };
278    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
279      // The ftpClient is an inconsistent state. Must close the stream
280      // which in turn will logout and disconnect from FTP server
281      fos.close();
282      throw new IOException("Unable to create file: " + file + ", Aborting");
283    }
284    return fos;
285  }
286
287  /** This optional operation is not yet supported. */
288  @Override
289  public FSDataOutputStream append(Path f, int bufferSize,
290      Progressable progress) throws IOException {
291    throw new IOException("Not supported");
292  }
293  
294  /**
295   * Convenience method, so that we don't open a new connection when using this
296   * method from within another method. Otherwise every API invocation incurs
297   * the overhead of opening/closing a TCP connection.
298   * @throws IOException on IO problems other than FileNotFoundException
299   */
300  private boolean exists(FTPClient client, Path file) throws IOException {
301    try {
302      getFileStatus(client, file);
303      return true;
304    } catch (FileNotFoundException fnfe) {
305      return false;
306    }
307  }
308
309  @Override
310  public boolean delete(Path file, boolean recursive) throws IOException {
311    FTPClient client = connect();
312    try {
313      boolean success = delete(client, file, recursive);
314      return success;
315    } finally {
316      disconnect(client);
317    }
318  }
319
320  /**
321   * Convenience method, so that we don't open a new connection when using this
322   * method from within another method. Otherwise every API invocation incurs
323   * the overhead of opening/closing a TCP connection.
324   */
325  private boolean delete(FTPClient client, Path file, boolean recursive)
326      throws IOException {
327    Path workDir = new Path(client.printWorkingDirectory());
328    Path absolute = makeAbsolute(workDir, file);
329    String pathName = absolute.toUri().getPath();
330    try {
331      FileStatus fileStat = getFileStatus(client, absolute);
332      if (fileStat.isFile()) {
333        return client.deleteFile(pathName);
334      }
335    } catch (FileNotFoundException e) {
336      //the file is not there
337      return false;
338    }
339    FileStatus[] dirEntries = listStatus(client, absolute);
340    if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
341      throw new IOException("Directory: " + file + " is not empty.");
342    }
343    for (FileStatus dirEntry : dirEntries) {
344      delete(client, new Path(absolute, dirEntry.getPath()), recursive);
345    }
346    return client.removeDirectory(pathName);
347  }
348
349  private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
350    FsAction action = FsAction.NONE;
351    if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
352      action.or(FsAction.READ);
353    }
354    if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
355      action.or(FsAction.WRITE);
356    }
357    if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
358      action.or(FsAction.EXECUTE);
359    }
360    return action;
361  }
362
363  private FsPermission getPermissions(FTPFile ftpFile) {
364    FsAction user, group, others;
365    user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
366    group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
367    others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
368    return new FsPermission(user, group, others);
369  }
370
371  @Override
372  public URI getUri() {
373    return uri;
374  }
375
376  @Override
377  public FileStatus[] listStatus(Path file) throws IOException {
378    FTPClient client = connect();
379    try {
380      FileStatus[] stats = listStatus(client, file);
381      return stats;
382    } finally {
383      disconnect(client);
384    }
385  }
386
387  /**
388   * Convenience method, so that we don't open a new connection when using this
389   * method from within another method. Otherwise every API invocation incurs
390   * the overhead of opening/closing a TCP connection.
391   */
392  private FileStatus[] listStatus(FTPClient client, Path file)
393      throws IOException {
394    Path workDir = new Path(client.printWorkingDirectory());
395    Path absolute = makeAbsolute(workDir, file);
396    FileStatus fileStat = getFileStatus(client, absolute);
397    if (fileStat.isFile()) {
398      return new FileStatus[] { fileStat };
399    }
400    FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
401    FileStatus[] fileStats = new FileStatus[ftpFiles.length];
402    for (int i = 0; i < ftpFiles.length; i++) {
403      fileStats[i] = getFileStatus(ftpFiles[i], absolute);
404    }
405    return fileStats;
406  }
407
408  @Override
409  public FileStatus getFileStatus(Path file) throws IOException {
410    FTPClient client = connect();
411    try {
412      FileStatus status = getFileStatus(client, file);
413      return status;
414    } finally {
415      disconnect(client);
416    }
417  }
418
419  /**
420   * Convenience method, so that we don't open a new connection when using this
421   * method from within another method. Otherwise every API invocation incurs
422   * the overhead of opening/closing a TCP connection.
423   */
424  private FileStatus getFileStatus(FTPClient client, Path file)
425      throws IOException {
426    FileStatus fileStat = null;
427    Path workDir = new Path(client.printWorkingDirectory());
428    Path absolute = makeAbsolute(workDir, file);
429    Path parentPath = absolute.getParent();
430    if (parentPath == null) { // root dir
431      long length = -1; // Length of root dir on server not known
432      boolean isDir = true;
433      int blockReplication = 1;
434      long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
435      long modTime = -1; // Modification time of root dir not known.
436      Path root = new Path("/");
437      return new FileStatus(length, isDir, blockReplication, blockSize,
438          modTime, root.makeQualified(this));
439    }
440    String pathName = parentPath.toUri().getPath();
441    FTPFile[] ftpFiles = client.listFiles(pathName);
442    if (ftpFiles != null) {
443      for (FTPFile ftpFile : ftpFiles) {
444        if (ftpFile.getName().equals(file.getName())) { // file found in dir
445          fileStat = getFileStatus(ftpFile, parentPath);
446          break;
447        }
448      }
449      if (fileStat == null) {
450        throw new FileNotFoundException("File " + file + " does not exist.");
451      }
452    } else {
453      throw new FileNotFoundException("File " + file + " does not exist.");
454    }
455    return fileStat;
456  }
457
458  /**
459   * Convert the file information in FTPFile to a {@link FileStatus} object. *
460   * 
461   * @param ftpFile
462   * @param parentPath
463   * @return FileStatus
464   */
465  private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
466    long length = ftpFile.getSize();
467    boolean isDir = ftpFile.isDirectory();
468    int blockReplication = 1;
469    // Using default block size since there is no way in FTP client to know of
470    // block sizes on server. The assumption could be less than ideal.
471    long blockSize = DEFAULT_BLOCK_SIZE;
472    long modTime = ftpFile.getTimestamp().getTimeInMillis();
473    long accessTime = 0;
474    FsPermission permission = getPermissions(ftpFile);
475    String user = ftpFile.getUser();
476    String group = ftpFile.getGroup();
477    Path filePath = new Path(parentPath, ftpFile.getName());
478    return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
479        accessTime, permission, user, group, filePath.makeQualified(this));
480  }
481
482  @Override
483  public boolean mkdirs(Path file, FsPermission permission) throws IOException {
484    FTPClient client = connect();
485    try {
486      boolean success = mkdirs(client, file, permission);
487      return success;
488    } finally {
489      disconnect(client);
490    }
491  }
492
493  /**
494   * Convenience method, so that we don't open a new connection when using this
495   * method from within another method. Otherwise every API invocation incurs
496   * the overhead of opening/closing a TCP connection.
497   */
498  private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
499      throws IOException {
500    boolean created = true;
501    Path workDir = new Path(client.printWorkingDirectory());
502    Path absolute = makeAbsolute(workDir, file);
503    String pathName = absolute.getName();
504    if (!exists(client, absolute)) {
505      Path parent = absolute.getParent();
506      created = (parent == null || mkdirs(client, parent, FsPermission
507          .getDirDefault()));
508      if (created) {
509        String parentDir = parent.toUri().getPath();
510        client.changeWorkingDirectory(parentDir);
511        created = created && client.makeDirectory(pathName);
512      }
513    } else if (isFile(client, absolute)) {
514      throw new ParentNotDirectoryException(String.format(
515          "Can't make directory for path %s since it is a file.", absolute));
516    }
517    return created;
518  }
519
520  /**
521   * Convenience method, so that we don't open a new connection when using this
522   * method from within another method. Otherwise every API invocation incurs
523   * the overhead of opening/closing a TCP connection.
524   */
525  private boolean isFile(FTPClient client, Path file) {
526    try {
527      return getFileStatus(client, file).isFile();
528    } catch (FileNotFoundException e) {
529      return false; // file does not exist
530    } catch (IOException ioe) {
531      throw new FTPException("File check failed", ioe);
532    }
533  }
534
535  /*
536   * Assuming that parent of both source and destination is the same. Is the
537   * assumption correct or it is suppose to work like 'move' ?
538   */
539  @Override
540  public boolean rename(Path src, Path dst) throws IOException {
541    FTPClient client = connect();
542    try {
543      boolean success = rename(client, src, dst);
544      return success;
545    } finally {
546      disconnect(client);
547    }
548  }
549
550  /**
551   * Probe for a path being a parent of another
552   * @param parent parent path
553   * @param child possible child path
554   * @return true if the parent's path matches the start of the child's
555   */
556  private boolean isParentOf(Path parent, Path child) {
557    URI parentURI = parent.toUri();
558    String parentPath = parentURI.getPath();
559    if (!parentPath.endsWith("/")) {
560      parentPath += "/";
561    }
562    URI childURI = child.toUri();
563    String childPath = childURI.getPath();
564    return childPath.startsWith(parentPath);
565  }
566
567  /**
568   * Convenience method, so that we don't open a new connection when using this
569   * method from within another method. Otherwise every API invocation incurs
570   * the overhead of opening/closing a TCP connection.
571   * 
572   * @param client
573   * @param src
574   * @param dst
575   * @return
576   * @throws IOException
577   */
578  private boolean rename(FTPClient client, Path src, Path dst)
579      throws IOException {
580    Path workDir = new Path(client.printWorkingDirectory());
581    Path absoluteSrc = makeAbsolute(workDir, src);
582    Path absoluteDst = makeAbsolute(workDir, dst);
583    if (!exists(client, absoluteSrc)) {
584      throw new FileNotFoundException("Source path " + src + " does not exist");
585    }
586    if (isDirectory(absoluteDst)) {
587      // destination is a directory: rename goes underneath it with the
588      // source name
589      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
590    }
591    if (exists(client, absoluteDst)) {
592      throw new FileAlreadyExistsException("Destination path " + dst
593          + " already exists");
594    }
595    String parentSrc = absoluteSrc.getParent().toUri().toString();
596    String parentDst = absoluteDst.getParent().toUri().toString();
597    if (isParentOf(absoluteSrc, absoluteDst)) {
598      throw new IOException("Cannot rename " + absoluteSrc + " under itself"
599      + " : "+ absoluteDst);
600    }
601
602    if (!parentSrc.equals(parentDst)) {
603      throw new IOException("Cannot rename source: " + absoluteSrc
604          + " to " + absoluteDst
605          + " -"+ E_SAME_DIRECTORY_ONLY);
606    }
607    String from = absoluteSrc.getName();
608    String to = absoluteDst.getName();
609    client.changeWorkingDirectory(parentSrc);
610    boolean renamed = client.rename(from, to);
611    return renamed;
612  }
613
614  @Override
615  public Path getWorkingDirectory() {
616    // Return home directory always since we do not maintain state.
617    return getHomeDirectory();
618  }
619
620  @Override
621  public Path getHomeDirectory() {
622    FTPClient client = null;
623    try {
624      client = connect();
625      Path homeDir = new Path(client.printWorkingDirectory());
626      return homeDir;
627    } catch (IOException ioe) {
628      throw new FTPException("Failed to get home directory", ioe);
629    } finally {
630      try {
631        disconnect(client);
632      } catch (IOException ioe) {
633        throw new FTPException("Failed to disconnect", ioe);
634      }
635    }
636  }
637
638  @Override
639  public void setWorkingDirectory(Path newDir) {
640    // we do not maintain the working directory state
641  }
642}