001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs.sftp;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024import java.net.URI;
025import java.net.URLDecoder;
026import java.util.ArrayList;
027import java.util.Vector;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FSDataInputStream;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.permission.FsPermission;
038import org.apache.hadoop.util.Progressable;
039
040import com.jcraft.jsch.ChannelSftp;
041import com.jcraft.jsch.ChannelSftp.LsEntry;
042import com.jcraft.jsch.SftpATTRS;
043import com.jcraft.jsch.SftpException;
044
045/** SFTP FileSystem. */
046public class SFTPFileSystem extends FileSystem {
047
048  public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class);
049
050  private SFTPConnectionPool connectionPool;
051  private URI uri;
052
053  private static final int DEFAULT_SFTP_PORT = 22;
054  private static final int DEFAULT_MAX_CONNECTION = 5;
055  public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
056  public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
057  public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user.";
058  public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password.";
059  public static final String FS_SFTP_HOST = "fs.sftp.host";
060  public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port";
061  public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile";
062  public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max";
063  public static final String E_SAME_DIRECTORY_ONLY =
064      "only same directory renames are supported";
065  public static final String E_HOST_NULL = "Invalid host specified";
066  public static final String E_USER_NULL =
067      "No user specified for sftp connection. Expand URI or credential file.";
068  public static final String E_PATH_DIR = "Path %s is a directory.";
069  public static final String E_FILE_STATUS = "Failed to get file status";
070  public static final String E_FILE_NOTFOUND = "File %s does not exist.";
071  public static final String E_FILE_EXIST = "File already exists: %s";
072  public static final String E_CREATE_DIR =
073      "create(): Mkdirs failed to create: %s";
074  public static final String E_DIR_CREATE_FROMFILE =
075      "Can't make directory for path %s since it is a file.";
076  public static final String E_MAKE_DIR_FORPATH =
077      "Can't make directory for path \"%s\" under \"%s\".";
078  public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty.";
079  public static final String E_FILE_CHECK_FAILED = "File check failed";
080  public static final String E_NOT_SUPPORTED = "Not supported";
081  public static final String E_SPATH_NOTEXIST = "Source path %s does not exist";
082  public static final String E_DPATH_EXIST =
083      "Destination path %s already exist, cannot rename!";
084  public static final String E_FAILED_GETHOME = "Failed to get home directory";
085  public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
086
087  /**
088   * Set configuration from UI.
089   *
090   * @param uri
091   * @param conf
092   * @throws IOException
093   */
094  private void setConfigurationFromURI(URI uriInfo, Configuration conf)
095      throws IOException {
096
097    // get host information from URI
098    String host = uriInfo.getHost();
099    host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host;
100    if (host == null) {
101      throw new IOException(E_HOST_NULL);
102    }
103    conf.set(FS_SFTP_HOST, host);
104
105    int port = uriInfo.getPort();
106    port = (port == -1)
107      ? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT)
108      : port;
109    conf.setInt(FS_SFTP_HOST_PORT, port);
110
111    // get user/password information from URI
112    String userAndPwdFromUri = uriInfo.getUserInfo();
113    if (userAndPwdFromUri != null) {
114      String[] userPasswdInfo = userAndPwdFromUri.split(":");
115      String user = userPasswdInfo[0];
116      user = URLDecoder.decode(user, "UTF-8");
117      conf.set(FS_SFTP_USER_PREFIX + host, user);
118      if (userPasswdInfo.length > 1) {
119        conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." +
120            user, userPasswdInfo[1]);
121      }
122    }
123
124    String user = conf.get(FS_SFTP_USER_PREFIX + host);
125    if (user == null || user.equals("")) {
126      throw new IllegalStateException(E_USER_NULL);
127    }
128
129    int connectionMax =
130        conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION);
131    connectionPool = new SFTPConnectionPool(connectionMax);
132  }
133
134  /**
135   * Connecting by using configuration parameters.
136   *
137   * @return An FTPClient instance
138   * @throws IOException
139   */
140  private ChannelSftp connect() throws IOException {
141    Configuration conf = getConf();
142
143    String host = conf.get(FS_SFTP_HOST, null);
144    int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
145    String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
146    String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null);
147    String keyFile = conf.get(FS_SFTP_KEYFILE, null);
148
149    ChannelSftp channel =
150        connectionPool.connect(host, port, user, pwd, keyFile);
151
152    return channel;
153  }
154
155  /**
156   * Logout and disconnect the given channel.
157   *
158   * @param client
159   * @throws IOException
160   */
161  private void disconnect(ChannelSftp channel) throws IOException {
162    connectionPool.disconnect(channel);
163  }
164
165  /**
166   * Resolve against given working directory.
167   *
168   * @param workDir
169   * @param path
170   * @return absolute path
171   */
172  private Path makeAbsolute(Path workDir, Path path) {
173    if (path.isAbsolute()) {
174      return path;
175    }
176    return new Path(workDir, path);
177  }
178
179  /**
180   * Convenience method, so that we don't open a new connection when using this
181   * method from within another method. Otherwise every API invocation incurs
182   * the overhead of opening/closing a TCP connection.
183   * @throws IOException
184   */
185  private boolean exists(ChannelSftp channel, Path file) throws IOException {
186    try {
187      getFileStatus(channel, file);
188      return true;
189    } catch (FileNotFoundException fnfe) {
190      return false;
191    } catch (IOException ioe) {
192      throw new IOException(E_FILE_STATUS, ioe);
193    }
194  }
195
196  /**
197   * Convenience method, so that we don't open a new connection when using this
198   * method from within another method. Otherwise every API invocation incurs
199   * the overhead of opening/closing a TCP connection.
200   */
201  @SuppressWarnings("unchecked")
202  private FileStatus getFileStatus(ChannelSftp client, Path file)
203      throws IOException {
204    FileStatus fileStat = null;
205    Path workDir;
206    try {
207      workDir = new Path(client.pwd());
208    } catch (SftpException e) {
209      throw new IOException(e);
210    }
211    Path absolute = makeAbsolute(workDir, file);
212    Path parentPath = absolute.getParent();
213    if (parentPath == null) { // root directory
214      long length = -1; // Length of root directory on server not known
215      boolean isDir = true;
216      int blockReplication = 1;
217      long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
218      long modTime = -1; // Modification time of root directory not known.
219      Path root = new Path("/");
220      return new FileStatus(length, isDir, blockReplication, blockSize,
221          modTime,
222          root.makeQualified(this.getUri(), this.getWorkingDirectory()));
223    }
224    String pathName = parentPath.toUri().getPath();
225    Vector<LsEntry> sftpFiles;
226    try {
227      sftpFiles = (Vector<LsEntry>) client.ls(pathName);
228    } catch (SftpException e) {
229      throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
230    }
231    if (sftpFiles != null) {
232      for (LsEntry sftpFile : sftpFiles) {
233        if (sftpFile.getFilename().equals(file.getName())) {
234          // file found in directory
235          fileStat = getFileStatus(client, sftpFile, parentPath);
236          break;
237        }
238      }
239      if (fileStat == null) {
240        throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
241      }
242    } else {
243      throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
244    }
245    return fileStat;
246  }
247
248  /**
249   * Convert the file information in LsEntry to a {@link FileStatus} object. *
250   *
251   * @param sftpFile
252   * @param parentPath
253   * @return file status
254   * @throws IOException
255   */
256  private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile,
257      Path parentPath) throws IOException {
258
259    SftpATTRS attr = sftpFile.getAttrs();
260    long length = attr.getSize();
261    boolean isDir = attr.isDir();
262    boolean isLink = attr.isLink();
263    if (isLink) {
264      String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename();
265      try {
266        link = channel.realpath(link);
267
268        Path linkParent = new Path("/", link);
269
270        FileStatus fstat = getFileStatus(channel, linkParent);
271        isDir = fstat.isDirectory();
272        length = fstat.getLen();
273      } catch (Exception e) {
274        throw new IOException(e);
275      }
276    }
277    int blockReplication = 1;
278    // Using default block size since there is no way in SFTP channel to know of
279    // block sizes on server. The assumption could be less than ideal.
280    long blockSize = DEFAULT_BLOCK_SIZE;
281    long modTime = attr.getMTime() * 1000; // convert to milliseconds
282    long accessTime = 0;
283    FsPermission permission = getPermissions(sftpFile);
284    // not be able to get the real user group name, just use the user and group
285    // id
286    String user = Integer.toString(attr.getUId());
287    String group = Integer.toString(attr.getGId());
288    Path filePath = new Path(parentPath, sftpFile.getFilename());
289
290    return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
291        accessTime, permission, user, group, filePath.makeQualified(
292            this.getUri(), this.getWorkingDirectory()));
293  }
294
295  /**
296   * Return file permission.
297   *
298   * @param sftpFile
299   * @return file permission
300   */
301  private FsPermission getPermissions(LsEntry sftpFile) {
302    return new FsPermission((short) sftpFile.getAttrs().getPermissions());
303  }
304
305  /**
306   * Convenience method, so that we don't open a new connection when using this
307   * method from within another method. Otherwise every API invocation incurs
308   * the overhead of opening/closing a TCP connection.
309   */
310  private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
311      throws IOException {
312    boolean created = true;
313    Path workDir;
314    try {
315      workDir = new Path(client.pwd());
316    } catch (SftpException e) {
317      throw new IOException(e);
318    }
319    Path absolute = makeAbsolute(workDir, file);
320    String pathName = absolute.getName();
321    if (!exists(client, absolute)) {
322      Path parent = absolute.getParent();
323      created =
324          (parent == null || mkdirs(client, parent, FsPermission.getDefault()));
325      if (created) {
326        String parentDir = parent.toUri().getPath();
327        boolean succeeded = true;
328        try {
329          client.cd(parentDir);
330          client.mkdir(pathName);
331        } catch (SftpException e) {
332          throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName,
333              parentDir));
334        }
335        created = created & succeeded;
336      }
337    } else if (isFile(client, absolute)) {
338      throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute));
339    }
340    return created;
341  }
342
343  /**
344   * Convenience method, so that we don't open a new connection when using this
345   * method from within another method. Otherwise every API invocation incurs
346   * the overhead of opening/closing a TCP connection.
347   * @throws IOException
348   */
349  private boolean isFile(ChannelSftp channel, Path file) throws IOException {
350    try {
351      return !getFileStatus(channel, file).isDirectory();
352    } catch (FileNotFoundException e) {
353      return false; // file does not exist
354    } catch (IOException ioe) {
355      throw new IOException(E_FILE_CHECK_FAILED, ioe);
356    }
357  }
358
359  /**
360   * Convenience method, so that we don't open a new connection when using this
361   * method from within another method. Otherwise every API invocation incurs
362   * the overhead of opening/closing a TCP connection.
363   */
364  private boolean delete(ChannelSftp channel, Path file, boolean recursive)
365      throws IOException {
366    Path workDir;
367    try {
368      workDir = new Path(channel.pwd());
369    } catch (SftpException e) {
370      throw new IOException(e);
371    }
372    Path absolute = makeAbsolute(workDir, file);
373    String pathName = absolute.toUri().getPath();
374    FileStatus fileStat = null;
375    try {
376      fileStat = getFileStatus(channel, absolute);
377    } catch (FileNotFoundException e) {
378      // file not found, no need to delete, return true
379      return false;
380    }
381    if (!fileStat.isDirectory()) {
382      boolean status = true;
383      try {
384        channel.rm(pathName);
385      } catch (SftpException e) {
386        status = false;
387      }
388      return status;
389    } else {
390      boolean status = true;
391      FileStatus[] dirEntries = listStatus(channel, absolute);
392      if (dirEntries != null && dirEntries.length > 0) {
393        if (!recursive) {
394          throw new IOException(String.format(E_DIR_NOTEMPTY, file));
395        }
396        for (int i = 0; i < dirEntries.length; ++i) {
397          delete(channel, new Path(absolute, dirEntries[i].getPath()),
398              recursive);
399        }
400      }
401      try {
402        channel.rmdir(pathName);
403      } catch (SftpException e) {
404        status = false;
405      }
406      return status;
407    }
408  }
409
410  /**
411   * Convenience method, so that we don't open a new connection when using this
412   * method from within another method. Otherwise every API invocation incurs
413   * the overhead of opening/closing a TCP connection.
414   */
415  @SuppressWarnings("unchecked")
416  private FileStatus[] listStatus(ChannelSftp client, Path file)
417      throws IOException {
418    Path workDir;
419    try {
420      workDir = new Path(client.pwd());
421    } catch (SftpException e) {
422      throw new IOException(e);
423    }
424    Path absolute = makeAbsolute(workDir, file);
425    FileStatus fileStat = getFileStatus(client, absolute);
426    if (!fileStat.isDirectory()) {
427      return new FileStatus[] {fileStat};
428    }
429    Vector<LsEntry> sftpFiles;
430    try {
431      sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
432    } catch (SftpException e) {
433      throw new IOException(e);
434    }
435    ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>();
436    for (int i = 0; i < sftpFiles.size(); i++) {
437      LsEntry entry = sftpFiles.get(i);
438      String fname = entry.getFilename();
439      // skip current and parent directory, ie. "." and ".."
440      if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) {
441        fileStats.add(getFileStatus(client, entry, absolute));
442      }
443    }
444    return fileStats.toArray(new FileStatus[fileStats.size()]);
445  }
446
447  /**
448   * Convenience method, so that we don't open a new connection when using this
449   * method from within another method. Otherwise every API invocation incurs
450   * the overhead of opening/closing a TCP connection.
451   *
452   * @param channel
453   * @param src
454   * @param dst
455   * @return rename successful?
456   * @throws IOException
457   */
458  private boolean rename(ChannelSftp channel, Path src, Path dst)
459      throws IOException {
460    Path workDir;
461    try {
462      workDir = new Path(channel.pwd());
463    } catch (SftpException e) {
464      throw new IOException(e);
465    }
466    Path absoluteSrc = makeAbsolute(workDir, src);
467    Path absoluteDst = makeAbsolute(workDir, dst);
468
469    if (!exists(channel, absoluteSrc)) {
470      throw new IOException(String.format(E_SPATH_NOTEXIST, src));
471    }
472    if (exists(channel, absoluteDst)) {
473      throw new IOException(String.format(E_DPATH_EXIST, dst));
474    }
475    boolean renamed = true;
476    try {
477      channel.cd("/");
478      channel.rename(src.toUri().getPath(), dst.toUri().getPath());
479    } catch (SftpException e) {
480      renamed = false;
481    }
482    return renamed;
483  }
484
485  @Override
486  public void initialize(URI uriInfo, Configuration conf) throws IOException {
487    super.initialize(uriInfo, conf);
488
489    setConfigurationFromURI(uriInfo, conf);
490    setConf(conf);
491    this.uri = uriInfo;
492  }
493
494  @Override
495  public URI getUri() {
496    return uri;
497  }
498
499  @Override
500  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
501    ChannelSftp channel = connect();
502    Path workDir;
503    try {
504      workDir = new Path(channel.pwd());
505    } catch (SftpException e) {
506      throw new IOException(e);
507    }
508    Path absolute = makeAbsolute(workDir, f);
509    FileStatus fileStat = getFileStatus(channel, absolute);
510    if (fileStat.isDirectory()) {
511      disconnect(channel);
512      throw new IOException(String.format(E_PATH_DIR, f));
513    }
514    InputStream is;
515    try {
516      // the path could be a symbolic link, so get the real path
517      absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
518
519      is = channel.get(absolute.toUri().getPath());
520    } catch (SftpException e) {
521      throw new IOException(e);
522    }
523
524    FSDataInputStream fis =
525        new FSDataInputStream(new SFTPInputStream(is, channel, statistics));
526    return fis;
527  }
528
529  /**
530   * A stream obtained via this call must be closed before using other APIs of
531   * this class or else the invocation will block.
532   */
533  @Override
534  public FSDataOutputStream create(Path f, FsPermission permission,
535      boolean overwrite, int bufferSize, short replication, long blockSize,
536      Progressable progress) throws IOException {
537    final ChannelSftp client = connect();
538    Path workDir;
539    try {
540      workDir = new Path(client.pwd());
541    } catch (SftpException e) {
542      throw new IOException(e);
543    }
544    Path absolute = makeAbsolute(workDir, f);
545    if (exists(client, f)) {
546      if (overwrite) {
547        delete(client, f, false);
548      } else {
549        disconnect(client);
550        throw new IOException(String.format(E_FILE_EXIST, f));
551      }
552    }
553    Path parent = absolute.getParent();
554    if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
555      parent = (parent == null) ? new Path("/") : parent;
556      disconnect(client);
557      throw new IOException(String.format(E_CREATE_DIR, parent));
558    }
559    OutputStream os;
560    try {
561      client.cd(parent.toUri().getPath());
562      os = client.put(f.getName());
563    } catch (SftpException e) {
564      throw new IOException(e);
565    }
566    FSDataOutputStream fos = new FSDataOutputStream(os, statistics) {
567      @Override
568      public void close() throws IOException {
569        super.close();
570        disconnect(client);
571      }
572    };
573
574    return fos;
575  }
576
577  @Override
578  public FSDataOutputStream append(Path f, int bufferSize,
579      Progressable progress)
580      throws IOException {
581    throw new IOException(E_NOT_SUPPORTED);
582  }
583
584  /*
585   * The parent of source and destination can be different. It is suppose to
586   * work like 'move'
587   */
588  @Override
589  public boolean rename(Path src, Path dst) throws IOException {
590    ChannelSftp channel = connect();
591    try {
592      boolean success = rename(channel, src, dst);
593      return success;
594    } finally {
595      disconnect(channel);
596    }
597  }
598
599  @Override
600  public boolean delete(Path f, boolean recursive) throws IOException {
601    ChannelSftp channel = connect();
602    try {
603      boolean success = delete(channel, f, recursive);
604      return success;
605    } finally {
606      disconnect(channel);
607    }
608  }
609
610  @Override
611  public FileStatus[] listStatus(Path f) throws IOException {
612    ChannelSftp client = connect();
613    try {
614      FileStatus[] stats = listStatus(client, f);
615      return stats;
616    } finally {
617      disconnect(client);
618    }
619  }
620
621  @Override
622  public void setWorkingDirectory(Path newDir) {
623    // we do not maintain the working directory state
624  }
625
626  @Override
627  public Path getWorkingDirectory() {
628    // Return home directory always since we do not maintain state.
629    return getHomeDirectory();
630  }
631
632  @Override
633  public Path getHomeDirectory() {
634    ChannelSftp channel = null;
635    try {
636      channel = connect();
637      Path homeDir = new Path(channel.pwd());
638      return homeDir;
639    } catch (Exception ioe) {
640      return null;
641    } finally {
642      try {
643        disconnect(channel);
644      } catch (IOException ioe) {
645        return null;
646      }
647    }
648  }
649
650  @Override
651  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
652    ChannelSftp client = connect();
653    try {
654      boolean success = mkdirs(client, f, permission);
655      return success;
656    } finally {
657      disconnect(client);
658    }
659  }
660
661  @Override
662  public FileStatus getFileStatus(Path f) throws IOException {
663    ChannelSftp channel = connect();
664    try {
665      FileStatus status = getFileStatus(channel, f);
666      return status;
667    } finally {
668      disconnect(channel);
669    }
670  }
671}