001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.sftp; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024import java.net.URI; 025import java.net.URLDecoder; 026import java.util.ArrayList; 027import java.util.Vector; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FSDataInputStream; 033import org.apache.hadoop.fs.FSDataOutputStream; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037import org.apache.hadoop.fs.permission.FsPermission; 038import org.apache.hadoop.util.Progressable; 039 040import com.jcraft.jsch.ChannelSftp; 041import com.jcraft.jsch.ChannelSftp.LsEntry; 042import com.jcraft.jsch.SftpATTRS; 043import com.jcraft.jsch.SftpException; 044 045/** SFTP FileSystem. */ 046public class SFTPFileSystem extends FileSystem { 047 048 public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class); 049 050 private SFTPConnectionPool connectionPool; 051 private URI uri; 052 053 private static final int DEFAULT_SFTP_PORT = 22; 054 private static final int DEFAULT_MAX_CONNECTION = 5; 055 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 056 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 057 public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user."; 058 public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password."; 059 public static final String FS_SFTP_HOST = "fs.sftp.host"; 060 public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port"; 061 public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile"; 062 public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max"; 063 public static final String E_SAME_DIRECTORY_ONLY = 064 "only same directory renames are supported"; 065 public static final String E_HOST_NULL = "Invalid host specified"; 066 public static final String E_USER_NULL = 067 "No user specified for sftp connection. Expand URI or credential file."; 068 public static final String E_PATH_DIR = "Path %s is a directory."; 069 public static final String E_FILE_STATUS = "Failed to get file status"; 070 public static final String E_FILE_NOTFOUND = "File %s does not exist."; 071 public static final String E_FILE_EXIST = "File already exists: %s"; 072 public static final String E_CREATE_DIR = 073 "create(): Mkdirs failed to create: %s"; 074 public static final String E_DIR_CREATE_FROMFILE = 075 "Can't make directory for path %s since it is a file."; 076 public static final String E_MAKE_DIR_FORPATH = 077 "Can't make directory for path \"%s\" under \"%s\"."; 078 public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty."; 079 public static final String E_FILE_CHECK_FAILED = "File check failed"; 080 public static final String E_NOT_SUPPORTED = "Not supported"; 081 public static final String E_SPATH_NOTEXIST = "Source path %s does not exist"; 082 public static final String E_DPATH_EXIST = 083 "Destination path %s already exist, cannot rename!"; 084 public static final String E_FAILED_GETHOME = "Failed to get home directory"; 085 public static final String E_FAILED_DISCONNECT = "Failed to disconnect"; 086 087 /** 088 * Set configuration from UI. 089 * 090 * @param uri 091 * @param conf 092 * @throws IOException 093 */ 094 private void setConfigurationFromURI(URI uriInfo, Configuration conf) 095 throws IOException { 096 097 // get host information from URI 098 String host = uriInfo.getHost(); 099 host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host; 100 if (host == null) { 101 throw new IOException(E_HOST_NULL); 102 } 103 conf.set(FS_SFTP_HOST, host); 104 105 int port = uriInfo.getPort(); 106 port = (port == -1) 107 ? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT) 108 : port; 109 conf.setInt(FS_SFTP_HOST_PORT, port); 110 111 // get user/password information from URI 112 String userAndPwdFromUri = uriInfo.getUserInfo(); 113 if (userAndPwdFromUri != null) { 114 String[] userPasswdInfo = userAndPwdFromUri.split(":"); 115 String user = userPasswdInfo[0]; 116 user = URLDecoder.decode(user, "UTF-8"); 117 conf.set(FS_SFTP_USER_PREFIX + host, user); 118 if (userPasswdInfo.length > 1) { 119 conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." + 120 user, userPasswdInfo[1]); 121 } 122 } 123 124 String user = conf.get(FS_SFTP_USER_PREFIX + host); 125 if (user == null || user.equals("")) { 126 throw new IllegalStateException(E_USER_NULL); 127 } 128 129 int connectionMax = 130 conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION); 131 connectionPool = new SFTPConnectionPool(connectionMax); 132 } 133 134 /** 135 * Connecting by using configuration parameters. 136 * 137 * @return An FTPClient instance 138 * @throws IOException 139 */ 140 private ChannelSftp connect() throws IOException { 141 Configuration conf = getConf(); 142 143 String host = conf.get(FS_SFTP_HOST, null); 144 int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT); 145 String user = conf.get(FS_SFTP_USER_PREFIX + host, null); 146 String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null); 147 String keyFile = conf.get(FS_SFTP_KEYFILE, null); 148 149 ChannelSftp channel = 150 connectionPool.connect(host, port, user, pwd, keyFile); 151 152 return channel; 153 } 154 155 /** 156 * Logout and disconnect the given channel. 157 * 158 * @param client 159 * @throws IOException 160 */ 161 private void disconnect(ChannelSftp channel) throws IOException { 162 connectionPool.disconnect(channel); 163 } 164 165 /** 166 * Resolve against given working directory. 167 * 168 * @param workDir 169 * @param path 170 * @return absolute path 171 */ 172 private Path makeAbsolute(Path workDir, Path path) { 173 if (path.isAbsolute()) { 174 return path; 175 } 176 return new Path(workDir, path); 177 } 178 179 /** 180 * Convenience method, so that we don't open a new connection when using this 181 * method from within another method. Otherwise every API invocation incurs 182 * the overhead of opening/closing a TCP connection. 183 * @throws IOException 184 */ 185 private boolean exists(ChannelSftp channel, Path file) throws IOException { 186 try { 187 getFileStatus(channel, file); 188 return true; 189 } catch (FileNotFoundException fnfe) { 190 return false; 191 } catch (IOException ioe) { 192 throw new IOException(E_FILE_STATUS, ioe); 193 } 194 } 195 196 /** 197 * Convenience method, so that we don't open a new connection when using this 198 * method from within another method. Otherwise every API invocation incurs 199 * the overhead of opening/closing a TCP connection. 200 */ 201 @SuppressWarnings("unchecked") 202 private FileStatus getFileStatus(ChannelSftp client, Path file) 203 throws IOException { 204 FileStatus fileStat = null; 205 Path workDir; 206 try { 207 workDir = new Path(client.pwd()); 208 } catch (SftpException e) { 209 throw new IOException(e); 210 } 211 Path absolute = makeAbsolute(workDir, file); 212 Path parentPath = absolute.getParent(); 213 if (parentPath == null) { // root directory 214 long length = -1; // Length of root directory on server not known 215 boolean isDir = true; 216 int blockReplication = 1; 217 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 218 long modTime = -1; // Modification time of root directory not known. 219 Path root = new Path("/"); 220 return new FileStatus(length, isDir, blockReplication, blockSize, 221 modTime, 222 root.makeQualified(this.getUri(), this.getWorkingDirectory())); 223 } 224 String pathName = parentPath.toUri().getPath(); 225 Vector<LsEntry> sftpFiles; 226 try { 227 sftpFiles = (Vector<LsEntry>) client.ls(pathName); 228 } catch (SftpException e) { 229 throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); 230 } 231 if (sftpFiles != null) { 232 for (LsEntry sftpFile : sftpFiles) { 233 if (sftpFile.getFilename().equals(file.getName())) { 234 // file found in directory 235 fileStat = getFileStatus(client, sftpFile, parentPath); 236 break; 237 } 238 } 239 if (fileStat == null) { 240 throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); 241 } 242 } else { 243 throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file)); 244 } 245 return fileStat; 246 } 247 248 /** 249 * Convert the file information in LsEntry to a {@link FileStatus} object. * 250 * 251 * @param sftpFile 252 * @param parentPath 253 * @return file status 254 * @throws IOException 255 */ 256 private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile, 257 Path parentPath) throws IOException { 258 259 SftpATTRS attr = sftpFile.getAttrs(); 260 long length = attr.getSize(); 261 boolean isDir = attr.isDir(); 262 boolean isLink = attr.isLink(); 263 if (isLink) { 264 String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename(); 265 try { 266 link = channel.realpath(link); 267 268 Path linkParent = new Path("/", link); 269 270 FileStatus fstat = getFileStatus(channel, linkParent); 271 isDir = fstat.isDirectory(); 272 length = fstat.getLen(); 273 } catch (Exception e) { 274 throw new IOException(e); 275 } 276 } 277 int blockReplication = 1; 278 // Using default block size since there is no way in SFTP channel to know of 279 // block sizes on server. The assumption could be less than ideal. 280 long blockSize = DEFAULT_BLOCK_SIZE; 281 long modTime = attr.getMTime() * 1000; // convert to milliseconds 282 long accessTime = 0; 283 FsPermission permission = getPermissions(sftpFile); 284 // not be able to get the real user group name, just use the user and group 285 // id 286 String user = Integer.toString(attr.getUId()); 287 String group = Integer.toString(attr.getGId()); 288 Path filePath = new Path(parentPath, sftpFile.getFilename()); 289 290 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 291 accessTime, permission, user, group, filePath.makeQualified( 292 this.getUri(), this.getWorkingDirectory())); 293 } 294 295 /** 296 * Return file permission. 297 * 298 * @param sftpFile 299 * @return file permission 300 */ 301 private FsPermission getPermissions(LsEntry sftpFile) { 302 return new FsPermission((short) sftpFile.getAttrs().getPermissions()); 303 } 304 305 /** 306 * Convenience method, so that we don't open a new connection when using this 307 * method from within another method. Otherwise every API invocation incurs 308 * the overhead of opening/closing a TCP connection. 309 */ 310 private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) 311 throws IOException { 312 boolean created = true; 313 Path workDir; 314 try { 315 workDir = new Path(client.pwd()); 316 } catch (SftpException e) { 317 throw new IOException(e); 318 } 319 Path absolute = makeAbsolute(workDir, file); 320 String pathName = absolute.getName(); 321 if (!exists(client, absolute)) { 322 Path parent = absolute.getParent(); 323 created = 324 (parent == null || mkdirs(client, parent, FsPermission.getDefault())); 325 if (created) { 326 String parentDir = parent.toUri().getPath(); 327 boolean succeeded = true; 328 try { 329 client.cd(parentDir); 330 client.mkdir(pathName); 331 } catch (SftpException e) { 332 throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName, 333 parentDir)); 334 } 335 created = created & succeeded; 336 } 337 } else if (isFile(client, absolute)) { 338 throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute)); 339 } 340 return created; 341 } 342 343 /** 344 * Convenience method, so that we don't open a new connection when using this 345 * method from within another method. Otherwise every API invocation incurs 346 * the overhead of opening/closing a TCP connection. 347 * @throws IOException 348 */ 349 private boolean isFile(ChannelSftp channel, Path file) throws IOException { 350 try { 351 return !getFileStatus(channel, file).isDirectory(); 352 } catch (FileNotFoundException e) { 353 return false; // file does not exist 354 } catch (IOException ioe) { 355 throw new IOException(E_FILE_CHECK_FAILED, ioe); 356 } 357 } 358 359 /** 360 * Convenience method, so that we don't open a new connection when using this 361 * method from within another method. Otherwise every API invocation incurs 362 * the overhead of opening/closing a TCP connection. 363 */ 364 private boolean delete(ChannelSftp channel, Path file, boolean recursive) 365 throws IOException { 366 Path workDir; 367 try { 368 workDir = new Path(channel.pwd()); 369 } catch (SftpException e) { 370 throw new IOException(e); 371 } 372 Path absolute = makeAbsolute(workDir, file); 373 String pathName = absolute.toUri().getPath(); 374 FileStatus fileStat = null; 375 try { 376 fileStat = getFileStatus(channel, absolute); 377 } catch (FileNotFoundException e) { 378 // file not found, no need to delete, return true 379 return false; 380 } 381 if (!fileStat.isDirectory()) { 382 boolean status = true; 383 try { 384 channel.rm(pathName); 385 } catch (SftpException e) { 386 status = false; 387 } 388 return status; 389 } else { 390 boolean status = true; 391 FileStatus[] dirEntries = listStatus(channel, absolute); 392 if (dirEntries != null && dirEntries.length > 0) { 393 if (!recursive) { 394 throw new IOException(String.format(E_DIR_NOTEMPTY, file)); 395 } 396 for (int i = 0; i < dirEntries.length; ++i) { 397 delete(channel, new Path(absolute, dirEntries[i].getPath()), 398 recursive); 399 } 400 } 401 try { 402 channel.rmdir(pathName); 403 } catch (SftpException e) { 404 status = false; 405 } 406 return status; 407 } 408 } 409 410 /** 411 * Convenience method, so that we don't open a new connection when using this 412 * method from within another method. Otherwise every API invocation incurs 413 * the overhead of opening/closing a TCP connection. 414 */ 415 @SuppressWarnings("unchecked") 416 private FileStatus[] listStatus(ChannelSftp client, Path file) 417 throws IOException { 418 Path workDir; 419 try { 420 workDir = new Path(client.pwd()); 421 } catch (SftpException e) { 422 throw new IOException(e); 423 } 424 Path absolute = makeAbsolute(workDir, file); 425 FileStatus fileStat = getFileStatus(client, absolute); 426 if (!fileStat.isDirectory()) { 427 return new FileStatus[] {fileStat}; 428 } 429 Vector<LsEntry> sftpFiles; 430 try { 431 sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath()); 432 } catch (SftpException e) { 433 throw new IOException(e); 434 } 435 ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>(); 436 for (int i = 0; i < sftpFiles.size(); i++) { 437 LsEntry entry = sftpFiles.get(i); 438 String fname = entry.getFilename(); 439 // skip current and parent directory, ie. "." and ".." 440 if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) { 441 fileStats.add(getFileStatus(client, entry, absolute)); 442 } 443 } 444 return fileStats.toArray(new FileStatus[fileStats.size()]); 445 } 446 447 /** 448 * Convenience method, so that we don't open a new connection when using this 449 * method from within another method. Otherwise every API invocation incurs 450 * the overhead of opening/closing a TCP connection. 451 * 452 * @param channel 453 * @param src 454 * @param dst 455 * @return rename successful? 456 * @throws IOException 457 */ 458 private boolean rename(ChannelSftp channel, Path src, Path dst) 459 throws IOException { 460 Path workDir; 461 try { 462 workDir = new Path(channel.pwd()); 463 } catch (SftpException e) { 464 throw new IOException(e); 465 } 466 Path absoluteSrc = makeAbsolute(workDir, src); 467 Path absoluteDst = makeAbsolute(workDir, dst); 468 469 if (!exists(channel, absoluteSrc)) { 470 throw new IOException(String.format(E_SPATH_NOTEXIST, src)); 471 } 472 if (exists(channel, absoluteDst)) { 473 throw new IOException(String.format(E_DPATH_EXIST, dst)); 474 } 475 boolean renamed = true; 476 try { 477 channel.cd("/"); 478 channel.rename(src.toUri().getPath(), dst.toUri().getPath()); 479 } catch (SftpException e) { 480 renamed = false; 481 } 482 return renamed; 483 } 484 485 @Override 486 public void initialize(URI uriInfo, Configuration conf) throws IOException { 487 super.initialize(uriInfo, conf); 488 489 setConfigurationFromURI(uriInfo, conf); 490 setConf(conf); 491 this.uri = uriInfo; 492 } 493 494 @Override 495 public URI getUri() { 496 return uri; 497 } 498 499 @Override 500 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 501 ChannelSftp channel = connect(); 502 Path workDir; 503 try { 504 workDir = new Path(channel.pwd()); 505 } catch (SftpException e) { 506 throw new IOException(e); 507 } 508 Path absolute = makeAbsolute(workDir, f); 509 FileStatus fileStat = getFileStatus(channel, absolute); 510 if (fileStat.isDirectory()) { 511 disconnect(channel); 512 throw new IOException(String.format(E_PATH_DIR, f)); 513 } 514 InputStream is; 515 try { 516 // the path could be a symbolic link, so get the real path 517 absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); 518 519 is = channel.get(absolute.toUri().getPath()); 520 } catch (SftpException e) { 521 throw new IOException(e); 522 } 523 524 FSDataInputStream fis = 525 new FSDataInputStream(new SFTPInputStream(is, channel, statistics)); 526 return fis; 527 } 528 529 /** 530 * A stream obtained via this call must be closed before using other APIs of 531 * this class or else the invocation will block. 532 */ 533 @Override 534 public FSDataOutputStream create(Path f, FsPermission permission, 535 boolean overwrite, int bufferSize, short replication, long blockSize, 536 Progressable progress) throws IOException { 537 final ChannelSftp client = connect(); 538 Path workDir; 539 try { 540 workDir = new Path(client.pwd()); 541 } catch (SftpException e) { 542 throw new IOException(e); 543 } 544 Path absolute = makeAbsolute(workDir, f); 545 if (exists(client, f)) { 546 if (overwrite) { 547 delete(client, f, false); 548 } else { 549 disconnect(client); 550 throw new IOException(String.format(E_FILE_EXIST, f)); 551 } 552 } 553 Path parent = absolute.getParent(); 554 if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) { 555 parent = (parent == null) ? new Path("/") : parent; 556 disconnect(client); 557 throw new IOException(String.format(E_CREATE_DIR, parent)); 558 } 559 OutputStream os; 560 try { 561 client.cd(parent.toUri().getPath()); 562 os = client.put(f.getName()); 563 } catch (SftpException e) { 564 throw new IOException(e); 565 } 566 FSDataOutputStream fos = new FSDataOutputStream(os, statistics) { 567 @Override 568 public void close() throws IOException { 569 super.close(); 570 disconnect(client); 571 } 572 }; 573 574 return fos; 575 } 576 577 @Override 578 public FSDataOutputStream append(Path f, int bufferSize, 579 Progressable progress) 580 throws IOException { 581 throw new IOException(E_NOT_SUPPORTED); 582 } 583 584 /* 585 * The parent of source and destination can be different. It is suppose to 586 * work like 'move' 587 */ 588 @Override 589 public boolean rename(Path src, Path dst) throws IOException { 590 ChannelSftp channel = connect(); 591 try { 592 boolean success = rename(channel, src, dst); 593 return success; 594 } finally { 595 disconnect(channel); 596 } 597 } 598 599 @Override 600 public boolean delete(Path f, boolean recursive) throws IOException { 601 ChannelSftp channel = connect(); 602 try { 603 boolean success = delete(channel, f, recursive); 604 return success; 605 } finally { 606 disconnect(channel); 607 } 608 } 609 610 @Override 611 public FileStatus[] listStatus(Path f) throws IOException { 612 ChannelSftp client = connect(); 613 try { 614 FileStatus[] stats = listStatus(client, f); 615 return stats; 616 } finally { 617 disconnect(client); 618 } 619 } 620 621 @Override 622 public void setWorkingDirectory(Path newDir) { 623 // we do not maintain the working directory state 624 } 625 626 @Override 627 public Path getWorkingDirectory() { 628 // Return home directory always since we do not maintain state. 629 return getHomeDirectory(); 630 } 631 632 @Override 633 public Path getHomeDirectory() { 634 ChannelSftp channel = null; 635 try { 636 channel = connect(); 637 Path homeDir = new Path(channel.pwd()); 638 return homeDir; 639 } catch (Exception ioe) { 640 return null; 641 } finally { 642 try { 643 disconnect(channel); 644 } catch (IOException ioe) { 645 return null; 646 } 647 } 648 } 649 650 @Override 651 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 652 ChannelSftp client = connect(); 653 try { 654 boolean success = mkdirs(client, f, permission); 655 return success; 656 } finally { 657 disconnect(client); 658 } 659 } 660 661 @Override 662 public FileStatus getFileStatus(Path f) throws IOException { 663 ChannelSftp channel = connect(); 664 try { 665 FileStatus status = getFileStatus(channel, f); 666 return status; 667 } finally { 668 disconnect(channel); 669 } 670 } 671}