001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.fs.ftp;
019    
020    import java.io.FileNotFoundException;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.net.URI;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.commons.net.ftp.FTP;
028    import org.apache.commons.net.ftp.FTPClient;
029    import org.apache.commons.net.ftp.FTPFile;
030    import org.apache.commons.net.ftp.FTPReply;
031    import org.apache.hadoop.classification.InterfaceAudience;
032    import org.apache.hadoop.classification.InterfaceStability;
033    import org.apache.hadoop.conf.Configuration;
034    import org.apache.hadoop.fs.FSDataInputStream;
035    import org.apache.hadoop.fs.FSDataOutputStream;
036    import org.apache.hadoop.fs.FileStatus;
037    import org.apache.hadoop.fs.FileSystem;
038    import org.apache.hadoop.fs.Path;
039    import org.apache.hadoop.fs.permission.FsAction;
040    import org.apache.hadoop.fs.permission.FsPermission;
041    import org.apache.hadoop.util.Progressable;
042    
043    /**
044     * <p>
045     * A {@link FileSystem} backed by an FTP client provided by <a
046     * href="http://commons.apache.org/net/">Apache Commons Net</a>.
047     * </p>
048     */
049    @InterfaceAudience.Public
050    @InterfaceStability.Stable
051    public class FTPFileSystem extends FileSystem {
052    
053      public static final Log LOG = LogFactory
054          .getLog(FTPFileSystem.class);
055    
056      public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
057    
058      public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
059    
060      private URI uri;
061    
062      /**
063       * Return the protocol scheme for the FileSystem.
064       * <p/>
065       *
066       * @return <code>ftp</code>
067       */
068      @Override
069      public String getScheme() {
070        return "ftp";
071      }
072    
073      @Override
074      public void initialize(URI uri, Configuration conf) throws IOException { // get
075        super.initialize(uri, conf);
076        // get host information from uri (overrides info in conf)
077        String host = uri.getHost();
078        host = (host == null) ? conf.get("fs.ftp.host", null) : host;
079        if (host == null) {
080          throw new IOException("Invalid host specified");
081        }
082        conf.set("fs.ftp.host", host);
083    
084        // get port information from uri, (overrides info in conf)
085        int port = uri.getPort();
086        port = (port == -1) ? FTP.DEFAULT_PORT : port;
087        conf.setInt("fs.ftp.host.port", port);
088    
089        // get user/password information from URI (overrides info in conf)
090        String userAndPassword = uri.getUserInfo();
091        if (userAndPassword == null) {
092          userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
093              .get("fs.ftp.password." + host, null));
094          if (userAndPassword == null) {
095            throw new IOException("Invalid user/passsword specified");
096          }
097        }
098        String[] userPasswdInfo = userAndPassword.split(":");
099        conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
100        if (userPasswdInfo.length > 1) {
101          conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
102        } else {
103          conf.set("fs.ftp.password." + host, null);
104        }
105        setConf(conf);
106        this.uri = uri;
107      }
108    
109      /**
110       * Connect to the FTP server using configuration parameters *
111       * 
112       * @return An FTPClient instance
113       * @throws IOException
114       */
115      private FTPClient connect() throws IOException {
116        FTPClient client = null;
117        Configuration conf = getConf();
118        String host = conf.get("fs.ftp.host");
119        int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
120        String user = conf.get("fs.ftp.user." + host);
121        String password = conf.get("fs.ftp.password." + host);
122        client = new FTPClient();
123        client.connect(host, port);
124        int reply = client.getReplyCode();
125        if (!FTPReply.isPositiveCompletion(reply)) {
126          throw new IOException("Server - " + host
127              + " refused connection on port - " + port);
128        } else if (client.login(user, password)) {
129          client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
130          client.setFileType(FTP.BINARY_FILE_TYPE);
131          client.setBufferSize(DEFAULT_BUFFER_SIZE);
132        } else {
133          throw new IOException("Login failed on server - " + host + ", port - "
134              + port);
135        }
136    
137        return client;
138      }
139    
140      /**
141       * Logout and disconnect the given FTPClient. *
142       * 
143       * @param client
144       * @throws IOException
145       */
146      private void disconnect(FTPClient client) throws IOException {
147        if (client != null) {
148          if (!client.isConnected()) {
149            throw new FTPException("Client not connected");
150          }
151          boolean logoutSuccess = client.logout();
152          client.disconnect();
153          if (!logoutSuccess) {
154            LOG.warn("Logout failed while disconnecting, error code - "
155                + client.getReplyCode());
156          }
157        }
158      }
159    
160      /**
161       * Resolve against given working directory. *
162       * 
163       * @param workDir
164       * @param path
165       * @return
166       */
167      private Path makeAbsolute(Path workDir, Path path) {
168        if (path.isAbsolute()) {
169          return path;
170        }
171        return new Path(workDir, path);
172      }
173    
174      @Override
175      public FSDataInputStream open(Path file, int bufferSize) throws IOException {
176        FTPClient client = connect();
177        Path workDir = new Path(client.printWorkingDirectory());
178        Path absolute = makeAbsolute(workDir, file);
179        FileStatus fileStat = getFileStatus(client, absolute);
180        if (fileStat.isDirectory()) {
181          disconnect(client);
182          throw new IOException("Path " + file + " is a directory.");
183        }
184        client.allocate(bufferSize);
185        Path parent = absolute.getParent();
186        // Change to parent directory on the
187        // server. Only then can we read the
188        // file
189        // on the server by opening up an InputStream. As a side effect the working
190        // directory on the server is changed to the parent directory of the file.
191        // The FTP client connection is closed when close() is called on the
192        // FSDataInputStream.
193        client.changeWorkingDirectory(parent.toUri().getPath());
194        InputStream is = client.retrieveFileStream(file.getName());
195        FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
196            client, statistics));
197        if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
198          // The ftpClient is an inconsistent state. Must close the stream
199          // which in turn will logout and disconnect from FTP server
200          fis.close();
201          throw new IOException("Unable to open file: " + file + ", Aborting");
202        }
203        return fis;
204      }
205    
206      /**
207       * A stream obtained via this call must be closed before using other APIs of
208       * this class or else the invocation will block.
209       */
210      @Override
211      public FSDataOutputStream create(Path file, FsPermission permission,
212          boolean overwrite, int bufferSize, short replication, long blockSize,
213          Progressable progress) throws IOException {
214        final FTPClient client = connect();
215        Path workDir = new Path(client.printWorkingDirectory());
216        Path absolute = makeAbsolute(workDir, file);
217        if (exists(client, file)) {
218          if (overwrite) {
219            delete(client, file);
220          } else {
221            disconnect(client);
222            throw new IOException("File already exists: " + file);
223          }
224        }
225        
226        Path parent = absolute.getParent();
227        if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) {
228          parent = (parent == null) ? new Path("/") : parent;
229          disconnect(client);
230          throw new IOException("create(): Mkdirs failed to create: " + parent);
231        }
232        client.allocate(bufferSize);
233        // Change to parent directory on the server. Only then can we write to the
234        // file on the server by opening up an OutputStream. As a side effect the
235        // working directory on the server is changed to the parent directory of the
236        // file. The FTP client connection is closed when close() is called on the
237        // FSDataOutputStream.
238        client.changeWorkingDirectory(parent.toUri().getPath());
239        FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
240            .getName()), statistics) {
241          @Override
242          public void close() throws IOException {
243            super.close();
244            if (!client.isConnected()) {
245              throw new FTPException("Client not connected");
246            }
247            boolean cmdCompleted = client.completePendingCommand();
248            disconnect(client);
249            if (!cmdCompleted) {
250              throw new FTPException("Could not complete transfer, Reply Code - "
251                  + client.getReplyCode());
252            }
253          }
254        };
255        if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
256          // The ftpClient is an inconsistent state. Must close the stream
257          // which in turn will logout and disconnect from FTP server
258          fos.close();
259          throw new IOException("Unable to create file: " + file + ", Aborting");
260        }
261        return fos;
262      }
263    
264      /** This optional operation is not yet supported. */
265      @Override
266      public FSDataOutputStream append(Path f, int bufferSize,
267          Progressable progress) throws IOException {
268        throw new IOException("Not supported");
269      }
270      
271      /**
272       * Convenience method, so that we don't open a new connection when using this
273       * method from within another method. Otherwise every API invocation incurs
274       * the overhead of opening/closing a TCP connection.
275       */
276      private boolean exists(FTPClient client, Path file) {
277        try {
278          return getFileStatus(client, file) != null;
279        } catch (FileNotFoundException fnfe) {
280          return false;
281        } catch (IOException ioe) {
282          throw new FTPException("Failed to get file status", ioe);
283        }
284      }
285    
286      @Override
287      public boolean delete(Path file, boolean recursive) throws IOException {
288        FTPClient client = connect();
289        try {
290          boolean success = delete(client, file, recursive);
291          return success;
292        } finally {
293          disconnect(client);
294        }
295      }
296    
297      /** @deprecated Use delete(Path, boolean) instead */
298      @Deprecated
299      private boolean delete(FTPClient client, Path file) throws IOException {
300        return delete(client, file, false);
301      }
302    
303      /**
304       * Convenience method, so that we don't open a new connection when using this
305       * method from within another method. Otherwise every API invocation incurs
306       * the overhead of opening/closing a TCP connection.
307       */
308      private boolean delete(FTPClient client, Path file, boolean recursive)
309          throws IOException {
310        Path workDir = new Path(client.printWorkingDirectory());
311        Path absolute = makeAbsolute(workDir, file);
312        String pathName = absolute.toUri().getPath();
313        FileStatus fileStat = getFileStatus(client, absolute);
314        if (fileStat.isFile()) {
315          return client.deleteFile(pathName);
316        }
317        FileStatus[] dirEntries = listStatus(client, absolute);
318        if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
319          throw new IOException("Directory: " + file + " is not empty.");
320        }
321        if (dirEntries != null) {
322          for (int i = 0; i < dirEntries.length; i++) {
323            delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
324          }
325        }
326        return client.removeDirectory(pathName);
327      }
328    
329      private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
330        FsAction action = FsAction.NONE;
331        if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
332          action.or(FsAction.READ);
333        }
334        if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
335          action.or(FsAction.WRITE);
336        }
337        if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
338          action.or(FsAction.EXECUTE);
339        }
340        return action;
341      }
342    
343      private FsPermission getPermissions(FTPFile ftpFile) {
344        FsAction user, group, others;
345        user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
346        group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
347        others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
348        return new FsPermission(user, group, others);
349      }
350    
351      @Override
352      public URI getUri() {
353        return uri;
354      }
355    
356      @Override
357      public FileStatus[] listStatus(Path file) throws IOException {
358        FTPClient client = connect();
359        try {
360          FileStatus[] stats = listStatus(client, file);
361          return stats;
362        } finally {
363          disconnect(client);
364        }
365      }
366    
367      /**
368       * Convenience method, so that we don't open a new connection when using this
369       * method from within another method. Otherwise every API invocation incurs
370       * the overhead of opening/closing a TCP connection.
371       */
372      private FileStatus[] listStatus(FTPClient client, Path file)
373          throws IOException {
374        Path workDir = new Path(client.printWorkingDirectory());
375        Path absolute = makeAbsolute(workDir, file);
376        FileStatus fileStat = getFileStatus(client, absolute);
377        if (fileStat.isFile()) {
378          return new FileStatus[] { fileStat };
379        }
380        FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
381        FileStatus[] fileStats = new FileStatus[ftpFiles.length];
382        for (int i = 0; i < ftpFiles.length; i++) {
383          fileStats[i] = getFileStatus(ftpFiles[i], absolute);
384        }
385        return fileStats;
386      }
387    
388      @Override
389      public FileStatus getFileStatus(Path file) throws IOException {
390        FTPClient client = connect();
391        try {
392          FileStatus status = getFileStatus(client, file);
393          return status;
394        } finally {
395          disconnect(client);
396        }
397      }
398    
399      /**
400       * Convenience method, so that we don't open a new connection when using this
401       * method from within another method. Otherwise every API invocation incurs
402       * the overhead of opening/closing a TCP connection.
403       */
404      private FileStatus getFileStatus(FTPClient client, Path file)
405          throws IOException {
406        FileStatus fileStat = null;
407        Path workDir = new Path(client.printWorkingDirectory());
408        Path absolute = makeAbsolute(workDir, file);
409        Path parentPath = absolute.getParent();
410        if (parentPath == null) { // root dir
411          long length = -1; // Length of root dir on server not known
412          boolean isDir = true;
413          int blockReplication = 1;
414          long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
415          long modTime = -1; // Modification time of root dir not known.
416          Path root = new Path("/");
417          return new FileStatus(length, isDir, blockReplication, blockSize,
418              modTime, root.makeQualified(this));
419        }
420        String pathName = parentPath.toUri().getPath();
421        FTPFile[] ftpFiles = client.listFiles(pathName);
422        if (ftpFiles != null) {
423          for (FTPFile ftpFile : ftpFiles) {
424            if (ftpFile.getName().equals(file.getName())) { // file found in dir
425              fileStat = getFileStatus(ftpFile, parentPath);
426              break;
427            }
428          }
429          if (fileStat == null) {
430            throw new FileNotFoundException("File " + file + " does not exist.");
431          }
432        } else {
433          throw new FileNotFoundException("File " + file + " does not exist.");
434        }
435        return fileStat;
436      }
437    
438      /**
439       * Convert the file information in FTPFile to a {@link FileStatus} object. *
440       * 
441       * @param ftpFile
442       * @param parentPath
443       * @return FileStatus
444       */
445      private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
446        long length = ftpFile.getSize();
447        boolean isDir = ftpFile.isDirectory();
448        int blockReplication = 1;
449        // Using default block size since there is no way in FTP client to know of
450        // block sizes on server. The assumption could be less than ideal.
451        long blockSize = DEFAULT_BLOCK_SIZE;
452        long modTime = ftpFile.getTimestamp().getTimeInMillis();
453        long accessTime = 0;
454        FsPermission permission = getPermissions(ftpFile);
455        String user = ftpFile.getUser();
456        String group = ftpFile.getGroup();
457        Path filePath = new Path(parentPath, ftpFile.getName());
458        return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
459            accessTime, permission, user, group, filePath.makeQualified(this));
460      }
461    
462      @Override
463      public boolean mkdirs(Path file, FsPermission permission) throws IOException {
464        FTPClient client = connect();
465        try {
466          boolean success = mkdirs(client, file, permission);
467          return success;
468        } finally {
469          disconnect(client);
470        }
471      }
472    
473      /**
474       * Convenience method, so that we don't open a new connection when using this
475       * method from within another method. Otherwise every API invocation incurs
476       * the overhead of opening/closing a TCP connection.
477       */
478      private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
479          throws IOException {
480        boolean created = true;
481        Path workDir = new Path(client.printWorkingDirectory());
482        Path absolute = makeAbsolute(workDir, file);
483        String pathName = absolute.getName();
484        if (!exists(client, absolute)) {
485          Path parent = absolute.getParent();
486          created = (parent == null || mkdirs(client, parent, FsPermission
487              .getDirDefault()));
488          if (created) {
489            String parentDir = parent.toUri().getPath();
490            client.changeWorkingDirectory(parentDir);
491            created = created && client.makeDirectory(pathName);
492          }
493        } else if (isFile(client, absolute)) {
494          throw new IOException(String.format(
495              "Can't make directory for path %s since it is a file.", absolute));
496        }
497        return created;
498      }
499    
500      /**
501       * Convenience method, so that we don't open a new connection when using this
502       * method from within another method. Otherwise every API invocation incurs
503       * the overhead of opening/closing a TCP connection.
504       */
505      private boolean isFile(FTPClient client, Path file) {
506        try {
507          return getFileStatus(client, file).isFile();
508        } catch (FileNotFoundException e) {
509          return false; // file does not exist
510        } catch (IOException ioe) {
511          throw new FTPException("File check failed", ioe);
512        }
513      }
514    
515      /*
516       * Assuming that parent of both source and destination is the same. Is the
517       * assumption correct or it is suppose to work like 'move' ?
518       */
519      @Override
520      public boolean rename(Path src, Path dst) throws IOException {
521        FTPClient client = connect();
522        try {
523          boolean success = rename(client, src, dst);
524          return success;
525        } finally {
526          disconnect(client);
527        }
528      }
529    
530      /**
531       * Convenience method, so that we don't open a new connection when using this
532       * method from within another method. Otherwise every API invocation incurs
533       * the overhead of opening/closing a TCP connection.
534       * 
535       * @param client
536       * @param src
537       * @param dst
538       * @return
539       * @throws IOException
540       */
541      private boolean rename(FTPClient client, Path src, Path dst)
542          throws IOException {
543        Path workDir = new Path(client.printWorkingDirectory());
544        Path absoluteSrc = makeAbsolute(workDir, src);
545        Path absoluteDst = makeAbsolute(workDir, dst);
546        if (!exists(client, absoluteSrc)) {
547          throw new IOException("Source path " + src + " does not exist");
548        }
549        if (exists(client, absoluteDst)) {
550          throw new IOException("Destination path " + dst
551              + " already exist, cannot rename!");
552        }
553        String parentSrc = absoluteSrc.getParent().toUri().toString();
554        String parentDst = absoluteDst.getParent().toUri().toString();
555        String from = src.getName();
556        String to = dst.getName();
557        if (!parentSrc.equals(parentDst)) {
558          throw new IOException("Cannot rename parent(source): " + parentSrc
559              + ", parent(destination):  " + parentDst);
560        }
561        client.changeWorkingDirectory(parentSrc);
562        boolean renamed = client.rename(from, to);
563        return renamed;
564      }
565    
566      @Override
567      public Path getWorkingDirectory() {
568        // Return home directory always since we do not maintain state.
569        return getHomeDirectory();
570      }
571    
572      @Override
573      public Path getHomeDirectory() {
574        FTPClient client = null;
575        try {
576          client = connect();
577          Path homeDir = new Path(client.printWorkingDirectory());
578          return homeDir;
579        } catch (IOException ioe) {
580          throw new FTPException("Failed to get home directory", ioe);
581        } finally {
582          try {
583            disconnect(client);
584          } catch (IOException ioe) {
585            throw new FTPException("Failed to disconnect", ioe);
586          }
587        }
588      }
589    
590      @Override
591      public void setWorkingDirectory(Path newDir) {
592        // we do not maintain the working directory state
593      }
594    }