001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs.ftp;
019    
020    import java.io.FileNotFoundException;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.net.ConnectException;
024    import java.net.URI;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.commons.net.ftp.FTP;
029    import org.apache.commons.net.ftp.FTPClient;
030    import org.apache.commons.net.ftp.FTPFile;
031    import org.apache.commons.net.ftp.FTPReply;
032    import org.apache.hadoop.classification.InterfaceAudience;
033    import org.apache.hadoop.classification.InterfaceStability;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.fs.FSDataInputStream;
036    import org.apache.hadoop.fs.FSDataOutputStream;
037    import org.apache.hadoop.fs.FileAlreadyExistsException;
038    import org.apache.hadoop.fs.FileStatus;
039    import org.apache.hadoop.fs.FileSystem;
040    import org.apache.hadoop.fs.ParentNotDirectoryException;
041    import org.apache.hadoop.fs.Path;
042    import org.apache.hadoop.fs.permission.FsAction;
043    import org.apache.hadoop.fs.permission.FsPermission;
044    import org.apache.hadoop.net.NetUtils;
045    import org.apache.hadoop.util.Progressable;
046    
047    /**
048     * <p>
049     * A {@link FileSystem} backed by an FTP client provided by <a
050     * href="http://commons.apache.org/net/">Apache Commons Net</a>.
051     * </p>
052     */
053    @InterfaceAudience.Public
054    @InterfaceStability.Stable
055    public class FTPFileSystem extends FileSystem {
056    
057      public static final Log LOG = LogFactory
058          .getLog(FTPFileSystem.class);
059    
060      public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
061    
062      public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
063      public static final String FS_FTP_USER_PREFIX = "fs.ftp.user.";
064      public static final String FS_FTP_HOST = "fs.ftp.host";
065      public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
066      public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
067      public static final String E_SAME_DIRECTORY_ONLY =
068          "only same directory renames are supported";
069    
070      private URI uri;
071    
072      /**
073       * Return the protocol scheme for the FileSystem.
074       * <p/>
075       *
076       * @return <code>ftp</code>
077       */
078      @Override
079      public String getScheme() {
080        return "ftp";
081      }
082    
083      @Override
084      public void initialize(URI uri, Configuration conf) throws IOException { // get
085        super.initialize(uri, conf);
086        // get host information from uri (overrides info in conf)
087        String host = uri.getHost();
088        host = (host == null) ? conf.get(FS_FTP_HOST, null) : host;
089        if (host == null) {
090          throw new IOException("Invalid host specified");
091        }
092        conf.set(FS_FTP_HOST, host);
093    
094        // get port information from uri, (overrides info in conf)
095        int port = uri.getPort();
096        port = (port == -1) ? FTP.DEFAULT_PORT : port;
097        conf.setInt("fs.ftp.host.port", port);
098    
099        // get user/password information from URI (overrides info in conf)
100        String userAndPassword = uri.getUserInfo();
101        if (userAndPassword == null) {
102          userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
103              .get("fs.ftp.password." + host, null));
104          if (userAndPassword == null) {
105            throw new IOException("Invalid user/passsword specified");
106          }
107        }
108        String[] userPasswdInfo = userAndPassword.split(":");
109        conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]);
110        if (userPasswdInfo.length > 1) {
111          conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]);
112        } else {
113          conf.set(FS_FTP_PASSWORD_PREFIX + host, null);
114        }
115        setConf(conf);
116        this.uri = uri;
117      }
118    
119      /**
120       * Connect to the FTP server using configuration parameters *
121       * 
122       * @return An FTPClient instance
123       * @throws IOException
124       */
125      private FTPClient connect() throws IOException {
126        FTPClient client = null;
127        Configuration conf = getConf();
128        String host = conf.get(FS_FTP_HOST);
129        int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
130        String user = conf.get(FS_FTP_USER_PREFIX + host);
131        String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
132        client = new FTPClient();
133        client.connect(host, port);
134        int reply = client.getReplyCode();
135        if (!FTPReply.isPositiveCompletion(reply)) {
136          throw NetUtils.wrapException(host, port,
137                       NetUtils.UNKNOWN_HOST, 0,
138                       new ConnectException("Server response " + reply));
139        } else if (client.login(user, password)) {
140          client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
141          client.setFileType(FTP.BINARY_FILE_TYPE);
142          client.setBufferSize(DEFAULT_BUFFER_SIZE);
143        } else {
144          throw new IOException("Login failed on server - " + host + ", port - "
145              + port + " as user '" + user + "'");
146        }
147    
148        return client;
149      }
150    
151      /**
152       * Logout and disconnect the given FTPClient. *
153       * 
154       * @param client
155       * @throws IOException
156       */
157      private void disconnect(FTPClient client) throws IOException {
158        if (client != null) {
159          if (!client.isConnected()) {
160            throw new FTPException("Client not connected");
161          }
162          boolean logoutSuccess = client.logout();
163          client.disconnect();
164          if (!logoutSuccess) {
165            LOG.warn("Logout failed while disconnecting, error code - "
166                + client.getReplyCode());
167          }
168        }
169      }
170    
171      /**
172       * Resolve against given working directory. *
173       * 
174       * @param workDir
175       * @param path
176       * @return
177       */
178      private Path makeAbsolute(Path workDir, Path path) {
179        if (path.isAbsolute()) {
180          return path;
181        }
182        return new Path(workDir, path);
183      }
184    
185      @Override
186      public FSDataInputStream open(Path file, int bufferSize) throws IOException {
187        FTPClient client = connect();
188        Path workDir = new Path(client.printWorkingDirectory());
189        Path absolute = makeAbsolute(workDir, file);
190        FileStatus fileStat = getFileStatus(client, absolute);
191        if (fileStat.isDirectory()) {
192          disconnect(client);
193          throw new FileNotFoundException("Path " + file + " is a directory.");
194        }
195        client.allocate(bufferSize);
196        Path parent = absolute.getParent();
197        // Change to parent directory on the
198        // server. Only then can we read the
199        // file
200        // on the server by opening up an InputStream. As a side effect the working
201        // directory on the server is changed to the parent directory of the file.
202        // The FTP client connection is closed when close() is called on the
203        // FSDataInputStream.
204        client.changeWorkingDirectory(parent.toUri().getPath());
205        InputStream is = client.retrieveFileStream(file.getName());
206        FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
207            client, statistics));
208        if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
209          // The ftpClient is an inconsistent state. Must close the stream
210          // which in turn will logout and disconnect from FTP server
211          fis.close();
212          throw new IOException("Unable to open file: " + file + ", Aborting");
213        }
214        return fis;
215      }
216    
217      /**
218       * A stream obtained via this call must be closed before using other APIs of
219       * this class or else the invocation will block.
220       */
221      @Override
222      public FSDataOutputStream create(Path file, FsPermission permission,
223          boolean overwrite, int bufferSize, short replication, long blockSize,
224          Progressable progress) throws IOException {
225        final FTPClient client = connect();
226        Path workDir = new Path(client.printWorkingDirectory());
227        Path absolute = makeAbsolute(workDir, file);
228        FileStatus status;
229        try {
230          status = getFileStatus(client, file);
231        } catch (FileNotFoundException fnfe) {
232          status = null;
233        }
234        if (status != null) {
235          if (overwrite && !status.isDirectory()) {
236            delete(client, file, false);
237          } else {
238            disconnect(client);
239            throw new FileAlreadyExistsException("File already exists: " + file);
240          }
241        }
242        
243        Path parent = absolute.getParent();
244        if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
245          parent = (parent == null) ? new Path("/") : parent;
246          disconnect(client);
247          throw new IOException("create(): Mkdirs failed to create: " + parent);
248        }
249        client.allocate(bufferSize);
250        // Change to parent directory on the server. Only then can we write to the
251        // file on the server by opening up an OutputStream. As a side effect the
252        // working directory on the server is changed to the parent directory of the
253        // file. The FTP client connection is closed when close() is called on the
254        // FSDataOutputStream.
255        client.changeWorkingDirectory(parent.toUri().getPath());
256        FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
257            .getName()), statistics) {
258          @Override
259          public void close() throws IOException {
260            super.close();
261            if (!client.isConnected()) {
262              throw new FTPException("Client not connected");
263            }
264            boolean cmdCompleted = client.completePendingCommand();
265            disconnect(client);
266            if (!cmdCompleted) {
267              throw new FTPException("Could not complete transfer, Reply Code - "
268                  + client.getReplyCode());
269            }
270          }
271        };
272        if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
273          // The ftpClient is an inconsistent state. Must close the stream
274          // which in turn will logout and disconnect from FTP server
275          fos.close();
276          throw new IOException("Unable to create file: " + file + ", Aborting");
277        }
278        return fos;
279      }
280    
281      /** This optional operation is not yet supported. */
282      @Override
283      public FSDataOutputStream append(Path f, int bufferSize,
284          Progressable progress) throws IOException {
285        throw new IOException("Not supported");
286      }
287      
288      /**
289       * Convenience method, so that we don't open a new connection when using this
290       * method from within another method. Otherwise every API invocation incurs
291       * the overhead of opening/closing a TCP connection.
292       * @throws IOException on IO problems other than FileNotFoundException
293       */
294      private boolean exists(FTPClient client, Path file) throws IOException {
295        try {
296          return getFileStatus(client, file) != null;
297        } catch (FileNotFoundException fnfe) {
298          return false;
299        }
300      }
301    
302      @Override
303      public boolean delete(Path file, boolean recursive) throws IOException {
304        FTPClient client = connect();
305        try {
306          boolean success = delete(client, file, recursive);
307          return success;
308        } finally {
309          disconnect(client);
310        }
311      }
312    
313      /**
314       * Convenience method, so that we don't open a new connection when using this
315       * method from within another method. Otherwise every API invocation incurs
316       * the overhead of opening/closing a TCP connection.
317       */
318      private boolean delete(FTPClient client, Path file, boolean recursive)
319          throws IOException {
320        Path workDir = new Path(client.printWorkingDirectory());
321        Path absolute = makeAbsolute(workDir, file);
322        String pathName = absolute.toUri().getPath();
323        try {
324          FileStatus fileStat = getFileStatus(client, absolute);
325          if (fileStat.isFile()) {
326            return client.deleteFile(pathName);
327          }
328        } catch (FileNotFoundException e) {
329          //the file is not there
330          return false;
331        }
332        FileStatus[] dirEntries = listStatus(client, absolute);
333        if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
334          throw new IOException("Directory: " + file + " is not empty.");
335        }
336        if (dirEntries != null) {
337          for (int i = 0; i < dirEntries.length; i++) {
338            delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
339          }
340        }
341        return client.removeDirectory(pathName);
342      }
343    
344      private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
345        FsAction action = FsAction.NONE;
346        if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
347          action.or(FsAction.READ);
348        }
349        if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
350          action.or(FsAction.WRITE);
351        }
352        if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
353          action.or(FsAction.EXECUTE);
354        }
355        return action;
356      }
357    
358      private FsPermission getPermissions(FTPFile ftpFile) {
359        FsAction user, group, others;
360        user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
361        group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
362        others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
363        return new FsPermission(user, group, others);
364      }
365    
366      @Override
367      public URI getUri() {
368        return uri;
369      }
370    
371      @Override
372      public FileStatus[] listStatus(Path file) throws IOException {
373        FTPClient client = connect();
374        try {
375          FileStatus[] stats = listStatus(client, file);
376          return stats;
377        } finally {
378          disconnect(client);
379        }
380      }
381    
382      /**
383       * Convenience method, so that we don't open a new connection when using this
384       * method from within another method. Otherwise every API invocation incurs
385       * the overhead of opening/closing a TCP connection.
386       */
387      private FileStatus[] listStatus(FTPClient client, Path file)
388          throws IOException {
389        Path workDir = new Path(client.printWorkingDirectory());
390        Path absolute = makeAbsolute(workDir, file);
391        FileStatus fileStat = getFileStatus(client, absolute);
392        if (fileStat.isFile()) {
393          return new FileStatus[] { fileStat };
394        }
395        FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
396        FileStatus[] fileStats = new FileStatus[ftpFiles.length];
397        for (int i = 0; i < ftpFiles.length; i++) {
398          fileStats[i] = getFileStatus(ftpFiles[i], absolute);
399        }
400        return fileStats;
401      }
402    
403      @Override
404      public FileStatus getFileStatus(Path file) throws IOException {
405        FTPClient client = connect();
406        try {
407          FileStatus status = getFileStatus(client, file);
408          return status;
409        } finally {
410          disconnect(client);
411        }
412      }
413    
414      /**
415       * Convenience method, so that we don't open a new connection when using this
416       * method from within another method. Otherwise every API invocation incurs
417       * the overhead of opening/closing a TCP connection.
418       */
419      private FileStatus getFileStatus(FTPClient client, Path file)
420          throws IOException {
421        FileStatus fileStat = null;
422        Path workDir = new Path(client.printWorkingDirectory());
423        Path absolute = makeAbsolute(workDir, file);
424        Path parentPath = absolute.getParent();
425        if (parentPath == null) { // root dir
426          long length = -1; // Length of root dir on server not known
427          boolean isDir = true;
428          int blockReplication = 1;
429          long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
430          long modTime = -1; // Modification time of root dir not known.
431          Path root = new Path("/");
432          return new FileStatus(length, isDir, blockReplication, blockSize,
433              modTime, root.makeQualified(this));
434        }
435        String pathName = parentPath.toUri().getPath();
436        FTPFile[] ftpFiles = client.listFiles(pathName);
437        if (ftpFiles != null) {
438          for (FTPFile ftpFile : ftpFiles) {
439            if (ftpFile.getName().equals(file.getName())) { // file found in dir
440              fileStat = getFileStatus(ftpFile, parentPath);
441              break;
442            }
443          }
444          if (fileStat == null) {
445            throw new FileNotFoundException("File " + file + " does not exist.");
446          }
447        } else {
448          throw new FileNotFoundException("File " + file + " does not exist.");
449        }
450        return fileStat;
451      }
452    
453      /**
454       * Convert the file information in FTPFile to a {@link FileStatus} object. *
455       * 
456       * @param ftpFile
457       * @param parentPath
458       * @return FileStatus
459       */
460      private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
461        long length = ftpFile.getSize();
462        boolean isDir = ftpFile.isDirectory();
463        int blockReplication = 1;
464        // Using default block size since there is no way in FTP client to know of
465        // block sizes on server. The assumption could be less than ideal.
466        long blockSize = DEFAULT_BLOCK_SIZE;
467        long modTime = ftpFile.getTimestamp().getTimeInMillis();
468        long accessTime = 0;
469        FsPermission permission = getPermissions(ftpFile);
470        String user = ftpFile.getUser();
471        String group = ftpFile.getGroup();
472        Path filePath = new Path(parentPath, ftpFile.getName());
473        return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
474            accessTime, permission, user, group, filePath.makeQualified(this));
475      }
476    
477      @Override
478      public boolean mkdirs(Path file, FsPermission permission) throws IOException {
479        FTPClient client = connect();
480        try {
481          boolean success = mkdirs(client, file, permission);
482          return success;
483        } finally {
484          disconnect(client);
485        }
486      }
487    
488      /**
489       * Convenience method, so that we don't open a new connection when using this
490       * method from within another method. Otherwise every API invocation incurs
491       * the overhead of opening/closing a TCP connection.
492       */
493      private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
494          throws IOException {
495        boolean created = true;
496        Path workDir = new Path(client.printWorkingDirectory());
497        Path absolute = makeAbsolute(workDir, file);
498        String pathName = absolute.getName();
499        if (!exists(client, absolute)) {
500          Path parent = absolute.getParent();
501          created = (parent == null || mkdirs(client, parent, FsPermission
502              .getDirDefault()));
503          if (created) {
504            String parentDir = parent.toUri().getPath();
505            client.changeWorkingDirectory(parentDir);
506            created = created && client.makeDirectory(pathName);
507          }
508        } else if (isFile(client, absolute)) {
509          throw new ParentNotDirectoryException(String.format(
510              "Can't make directory for path %s since it is a file.", absolute));
511        }
512        return created;
513      }
514    
515      /**
516       * Convenience method, so that we don't open a new connection when using this
517       * method from within another method. Otherwise every API invocation incurs
518       * the overhead of opening/closing a TCP connection.
519       */
520      private boolean isFile(FTPClient client, Path file) {
521        try {
522          return getFileStatus(client, file).isFile();
523        } catch (FileNotFoundException e) {
524          return false; // file does not exist
525        } catch (IOException ioe) {
526          throw new FTPException("File check failed", ioe);
527        }
528      }
529    
530      /*
531       * Assuming that parent of both source and destination is the same. Is the
532       * assumption correct or it is suppose to work like 'move' ?
533       */
534      @Override
535      public boolean rename(Path src, Path dst) throws IOException {
536        FTPClient client = connect();
537        try {
538          boolean success = rename(client, src, dst);
539          return success;
540        } finally {
541          disconnect(client);
542        }
543      }
544    
545      /**
546       * Probe for a path being a parent of another
547       * @param parent parent path
548       * @param child possible child path
549       * @return true if the parent's path matches the start of the child's
550       */
551      private boolean isParentOf(Path parent, Path child) {
552        URI parentURI = parent.toUri();
553        String parentPath = parentURI.getPath();
554        if (!parentPath.endsWith("/")) {
555          parentPath += "/";
556        }
557        URI childURI = child.toUri();
558        String childPath = childURI.getPath();
559        return childPath.startsWith(parentPath);
560      }
561    
562      /**
563       * Convenience method, so that we don't open a new connection when using this
564       * method from within another method. Otherwise every API invocation incurs
565       * the overhead of opening/closing a TCP connection.
566       * 
567       * @param client
568       * @param src
569       * @param dst
570       * @return
571       * @throws IOException
572       */
573      private boolean rename(FTPClient client, Path src, Path dst)
574          throws IOException {
575        Path workDir = new Path(client.printWorkingDirectory());
576        Path absoluteSrc = makeAbsolute(workDir, src);
577        Path absoluteDst = makeAbsolute(workDir, dst);
578        if (!exists(client, absoluteSrc)) {
579          throw new FileNotFoundException("Source path " + src + " does not exist");
580        }
581        if (isDirectory(absoluteDst)) {
582          // destination is a directory: rename goes underneath it with the
583          // source name
584          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
585        }
586        if (exists(client, absoluteDst)) {
587          throw new FileAlreadyExistsException("Destination path " + dst
588              + " already exists");
589        }
590        String parentSrc = absoluteSrc.getParent().toUri().toString();
591        String parentDst = absoluteDst.getParent().toUri().toString();
592        if (isParentOf(absoluteSrc, absoluteDst)) {
593          throw new IOException("Cannot rename " + absoluteSrc + " under itself"
594          + " : "+ absoluteDst);
595        }
596    
597        if (!parentSrc.equals(parentDst)) {
598          throw new IOException("Cannot rename source: " + absoluteSrc
599              + " to " + absoluteDst
600              + " -"+ E_SAME_DIRECTORY_ONLY);
601        }
602        String from = absoluteSrc.getName();
603        String to = absoluteDst.getName();
604        client.changeWorkingDirectory(parentSrc);
605        boolean renamed = client.rename(from, to);
606        return renamed;
607      }
608    
609      @Override
610      public Path getWorkingDirectory() {
611        // Return home directory always since we do not maintain state.
612        return getHomeDirectory();
613      }
614    
615      @Override
616      public Path getHomeDirectory() {
617        FTPClient client = null;
618        try {
619          client = connect();
620          Path homeDir = new Path(client.printWorkingDirectory());
621          return homeDir;
622        } catch (IOException ioe) {
623          throw new FTPException("Failed to get home directory", ioe);
624        } finally {
625          try {
626            disconnect(client);
627          } catch (IOException ioe) {
628            throw new FTPException("Failed to disconnect", ioe);
629          }
630        }
631      }
632    
633      @Override
634      public void setWorkingDirectory(Path newDir) {
635        // we do not maintain the working directory state
636      }
637    }