001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.ftp; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.ConnectException; 024import java.net.URI; 025 026import com.google.common.base.Preconditions; 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.apache.commons.net.ftp.FTP; 030import org.apache.commons.net.ftp.FTPClient; 031import org.apache.commons.net.ftp.FTPFile; 032import org.apache.commons.net.ftp.FTPReply; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataInputStream; 037import org.apache.hadoop.fs.FSDataOutputStream; 038import org.apache.hadoop.fs.FileAlreadyExistsException; 039import org.apache.hadoop.fs.FileStatus; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.ParentNotDirectoryException; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.permission.FsAction; 044import org.apache.hadoop.fs.permission.FsPermission; 045import org.apache.hadoop.net.NetUtils; 046import org.apache.hadoop.util.Progressable; 047 048/** 049 * <p> 050 * A {@link FileSystem} backed by an FTP client provided by <a 051 * href="http://commons.apache.org/net/">Apache Commons Net</a>. 052 * </p> 053 */ 054@InterfaceAudience.Public 055@InterfaceStability.Stable 056public class FTPFileSystem extends FileSystem { 057 058 public static final Log LOG = LogFactory 059 .getLog(FTPFileSystem.class); 060 061 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 062 063 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 064 public static final String FS_FTP_USER_PREFIX = "fs.ftp.user."; 065 public static final String FS_FTP_HOST = "fs.ftp.host"; 066 public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; 067 public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; 068 public static final String E_SAME_DIRECTORY_ONLY = 069 "only same directory renames are supported"; 070 071 private URI uri; 072 073 /** 074 * Return the protocol scheme for the FileSystem. 075 * <p/> 076 * 077 * @return <code>ftp</code> 078 */ 079 @Override 080 public String getScheme() { 081 return "ftp"; 082 } 083 084 /** 085 * Get the default port for this FTPFileSystem. 086 * 087 * @return the default port 088 */ 089 @Override 090 protected int getDefaultPort() { 091 return FTP.DEFAULT_PORT; 092 } 093 094 @Override 095 public void initialize(URI uri, Configuration conf) throws IOException { // get 096 super.initialize(uri, conf); 097 // get host information from uri (overrides info in conf) 098 String host = uri.getHost(); 099 host = (host == null) ? conf.get(FS_FTP_HOST, null) : host; 100 if (host == null) { 101 throw new IOException("Invalid host specified"); 102 } 103 conf.set(FS_FTP_HOST, host); 104 105 // get port information from uri, (overrides info in conf) 106 int port = uri.getPort(); 107 port = (port == -1) ? FTP.DEFAULT_PORT : port; 108 conf.setInt("fs.ftp.host.port", port); 109 110 // get user/password information from URI (overrides info in conf) 111 String userAndPassword = uri.getUserInfo(); 112 if (userAndPassword == null) { 113 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf 114 .get("fs.ftp.password." + host, null)); 115 } 116 String[] userPasswdInfo = userAndPassword.split(":"); 117 Preconditions.checkState(userPasswdInfo.length > 1, 118 "Invalid username / password"); 119 conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]); 120 conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]); 121 setConf(conf); 122 this.uri = uri; 123 } 124 125 /** 126 * Connect to the FTP server using configuration parameters * 127 * 128 * @return An FTPClient instance 129 * @throws IOException 130 */ 131 private FTPClient connect() throws IOException { 132 FTPClient client = null; 133 Configuration conf = getConf(); 134 String host = conf.get(FS_FTP_HOST); 135 int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); 136 String user = conf.get(FS_FTP_USER_PREFIX + host); 137 String password = conf.get(FS_FTP_PASSWORD_PREFIX + host); 138 client = new FTPClient(); 139 client.connect(host, port); 140 int reply = client.getReplyCode(); 141 if (!FTPReply.isPositiveCompletion(reply)) { 142 throw NetUtils.wrapException(host, port, 143 NetUtils.UNKNOWN_HOST, 0, 144 new ConnectException("Server response " + reply)); 145 } else if (client.login(user, password)) { 146 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); 147 client.setFileType(FTP.BINARY_FILE_TYPE); 148 client.setBufferSize(DEFAULT_BUFFER_SIZE); 149 } else { 150 throw new IOException("Login failed on server - " + host + ", port - " 151 + port + " as user '" + user + "'"); 152 } 153 154 return client; 155 } 156 157 /** 158 * Logout and disconnect the given FTPClient. * 159 * 160 * @param client 161 * @throws IOException 162 */ 163 private void disconnect(FTPClient client) throws IOException { 164 if (client != null) { 165 if (!client.isConnected()) { 166 throw new FTPException("Client not connected"); 167 } 168 boolean logoutSuccess = client.logout(); 169 client.disconnect(); 170 if (!logoutSuccess) { 171 LOG.warn("Logout failed while disconnecting, error code - " 172 + client.getReplyCode()); 173 } 174 } 175 } 176 177 /** 178 * Resolve against given working directory. * 179 * 180 * @param workDir 181 * @param path 182 * @return 183 */ 184 private Path makeAbsolute(Path workDir, Path path) { 185 if (path.isAbsolute()) { 186 return path; 187 } 188 return new Path(workDir, path); 189 } 190 191 @Override 192 public FSDataInputStream open(Path file, int bufferSize) throws IOException { 193 FTPClient client = connect(); 194 Path workDir = new Path(client.printWorkingDirectory()); 195 Path absolute = makeAbsolute(workDir, file); 196 FileStatus fileStat = getFileStatus(client, absolute); 197 if (fileStat.isDirectory()) { 198 disconnect(client); 199 throw new FileNotFoundException("Path " + file + " is a directory."); 200 } 201 client.allocate(bufferSize); 202 Path parent = absolute.getParent(); 203 // Change to parent directory on the 204 // server. Only then can we read the 205 // file 206 // on the server by opening up an InputStream. As a side effect the working 207 // directory on the server is changed to the parent directory of the file. 208 // The FTP client connection is closed when close() is called on the 209 // FSDataInputStream. 210 client.changeWorkingDirectory(parent.toUri().getPath()); 211 InputStream is = client.retrieveFileStream(file.getName()); 212 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, 213 client, statistics)); 214 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 215 // The ftpClient is an inconsistent state. Must close the stream 216 // which in turn will logout and disconnect from FTP server 217 fis.close(); 218 throw new IOException("Unable to open file: " + file + ", Aborting"); 219 } 220 return fis; 221 } 222 223 /** 224 * A stream obtained via this call must be closed before using other APIs of 225 * this class or else the invocation will block. 226 */ 227 @Override 228 public FSDataOutputStream create(Path file, FsPermission permission, 229 boolean overwrite, int bufferSize, short replication, long blockSize, 230 Progressable progress) throws IOException { 231 final FTPClient client = connect(); 232 Path workDir = new Path(client.printWorkingDirectory()); 233 Path absolute = makeAbsolute(workDir, file); 234 FileStatus status; 235 try { 236 status = getFileStatus(client, file); 237 } catch (FileNotFoundException fnfe) { 238 status = null; 239 } 240 if (status != null) { 241 if (overwrite && !status.isDirectory()) { 242 delete(client, file, false); 243 } else { 244 disconnect(client); 245 throw new FileAlreadyExistsException("File already exists: " + file); 246 } 247 } 248 249 Path parent = absolute.getParent(); 250 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) { 251 parent = (parent == null) ? new Path("/") : parent; 252 disconnect(client); 253 throw new IOException("create(): Mkdirs failed to create: " + parent); 254 } 255 client.allocate(bufferSize); 256 // Change to parent directory on the server. Only then can we write to the 257 // file on the server by opening up an OutputStream. As a side effect the 258 // working directory on the server is changed to the parent directory of the 259 // file. The FTP client connection is closed when close() is called on the 260 // FSDataOutputStream. 261 client.changeWorkingDirectory(parent.toUri().getPath()); 262 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file 263 .getName()), statistics) { 264 @Override 265 public void close() throws IOException { 266 super.close(); 267 if (!client.isConnected()) { 268 throw new FTPException("Client not connected"); 269 } 270 boolean cmdCompleted = client.completePendingCommand(); 271 disconnect(client); 272 if (!cmdCompleted) { 273 throw new FTPException("Could not complete transfer, Reply Code - " 274 + client.getReplyCode()); 275 } 276 } 277 }; 278 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 279 // The ftpClient is an inconsistent state. Must close the stream 280 // which in turn will logout and disconnect from FTP server 281 fos.close(); 282 throw new IOException("Unable to create file: " + file + ", Aborting"); 283 } 284 return fos; 285 } 286 287 /** This optional operation is not yet supported. */ 288 @Override 289 public FSDataOutputStream append(Path f, int bufferSize, 290 Progressable progress) throws IOException { 291 throw new IOException("Not supported"); 292 } 293 294 /** 295 * Convenience method, so that we don't open a new connection when using this 296 * method from within another method. Otherwise every API invocation incurs 297 * the overhead of opening/closing a TCP connection. 298 * @throws IOException on IO problems other than FileNotFoundException 299 */ 300 private boolean exists(FTPClient client, Path file) throws IOException { 301 try { 302 getFileStatus(client, file); 303 return true; 304 } catch (FileNotFoundException fnfe) { 305 return false; 306 } 307 } 308 309 @Override 310 public boolean delete(Path file, boolean recursive) throws IOException { 311 FTPClient client = connect(); 312 try { 313 boolean success = delete(client, file, recursive); 314 return success; 315 } finally { 316 disconnect(client); 317 } 318 } 319 320 /** 321 * Convenience method, so that we don't open a new connection when using this 322 * method from within another method. Otherwise every API invocation incurs 323 * the overhead of opening/closing a TCP connection. 324 */ 325 private boolean delete(FTPClient client, Path file, boolean recursive) 326 throws IOException { 327 Path workDir = new Path(client.printWorkingDirectory()); 328 Path absolute = makeAbsolute(workDir, file); 329 String pathName = absolute.toUri().getPath(); 330 try { 331 FileStatus fileStat = getFileStatus(client, absolute); 332 if (fileStat.isFile()) { 333 return client.deleteFile(pathName); 334 } 335 } catch (FileNotFoundException e) { 336 //the file is not there 337 return false; 338 } 339 FileStatus[] dirEntries = listStatus(client, absolute); 340 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { 341 throw new IOException("Directory: " + file + " is not empty."); 342 } 343 for (FileStatus dirEntry : dirEntries) { 344 delete(client, new Path(absolute, dirEntry.getPath()), recursive); 345 } 346 return client.removeDirectory(pathName); 347 } 348 349 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { 350 FsAction action = FsAction.NONE; 351 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { 352 action.or(FsAction.READ); 353 } 354 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { 355 action.or(FsAction.WRITE); 356 } 357 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { 358 action.or(FsAction.EXECUTE); 359 } 360 return action; 361 } 362 363 private FsPermission getPermissions(FTPFile ftpFile) { 364 FsAction user, group, others; 365 user = getFsAction(FTPFile.USER_ACCESS, ftpFile); 366 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); 367 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); 368 return new FsPermission(user, group, others); 369 } 370 371 @Override 372 public URI getUri() { 373 return uri; 374 } 375 376 @Override 377 public FileStatus[] listStatus(Path file) throws IOException { 378 FTPClient client = connect(); 379 try { 380 FileStatus[] stats = listStatus(client, file); 381 return stats; 382 } finally { 383 disconnect(client); 384 } 385 } 386 387 /** 388 * Convenience method, so that we don't open a new connection when using this 389 * method from within another method. Otherwise every API invocation incurs 390 * the overhead of opening/closing a TCP connection. 391 */ 392 private FileStatus[] listStatus(FTPClient client, Path file) 393 throws IOException { 394 Path workDir = new Path(client.printWorkingDirectory()); 395 Path absolute = makeAbsolute(workDir, file); 396 FileStatus fileStat = getFileStatus(client, absolute); 397 if (fileStat.isFile()) { 398 return new FileStatus[] { fileStat }; 399 } 400 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); 401 FileStatus[] fileStats = new FileStatus[ftpFiles.length]; 402 for (int i = 0; i < ftpFiles.length; i++) { 403 fileStats[i] = getFileStatus(ftpFiles[i], absolute); 404 } 405 return fileStats; 406 } 407 408 @Override 409 public FileStatus getFileStatus(Path file) throws IOException { 410 FTPClient client = connect(); 411 try { 412 FileStatus status = getFileStatus(client, file); 413 return status; 414 } finally { 415 disconnect(client); 416 } 417 } 418 419 /** 420 * Convenience method, so that we don't open a new connection when using this 421 * method from within another method. Otherwise every API invocation incurs 422 * the overhead of opening/closing a TCP connection. 423 */ 424 private FileStatus getFileStatus(FTPClient client, Path file) 425 throws IOException { 426 FileStatus fileStat = null; 427 Path workDir = new Path(client.printWorkingDirectory()); 428 Path absolute = makeAbsolute(workDir, file); 429 Path parentPath = absolute.getParent(); 430 if (parentPath == null) { // root dir 431 long length = -1; // Length of root dir on server not known 432 boolean isDir = true; 433 int blockReplication = 1; 434 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 435 long modTime = -1; // Modification time of root dir not known. 436 Path root = new Path("/"); 437 return new FileStatus(length, isDir, blockReplication, blockSize, 438 modTime, root.makeQualified(this)); 439 } 440 String pathName = parentPath.toUri().getPath(); 441 FTPFile[] ftpFiles = client.listFiles(pathName); 442 if (ftpFiles != null) { 443 for (FTPFile ftpFile : ftpFiles) { 444 if (ftpFile.getName().equals(file.getName())) { // file found in dir 445 fileStat = getFileStatus(ftpFile, parentPath); 446 break; 447 } 448 } 449 if (fileStat == null) { 450 throw new FileNotFoundException("File " + file + " does not exist."); 451 } 452 } else { 453 throw new FileNotFoundException("File " + file + " does not exist."); 454 } 455 return fileStat; 456 } 457 458 /** 459 * Convert the file information in FTPFile to a {@link FileStatus} object. * 460 * 461 * @param ftpFile 462 * @param parentPath 463 * @return FileStatus 464 */ 465 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { 466 long length = ftpFile.getSize(); 467 boolean isDir = ftpFile.isDirectory(); 468 int blockReplication = 1; 469 // Using default block size since there is no way in FTP client to know of 470 // block sizes on server. The assumption could be less than ideal. 471 long blockSize = DEFAULT_BLOCK_SIZE; 472 long modTime = ftpFile.getTimestamp().getTimeInMillis(); 473 long accessTime = 0; 474 FsPermission permission = getPermissions(ftpFile); 475 String user = ftpFile.getUser(); 476 String group = ftpFile.getGroup(); 477 Path filePath = new Path(parentPath, ftpFile.getName()); 478 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 479 accessTime, permission, user, group, filePath.makeQualified(this)); 480 } 481 482 @Override 483 public boolean mkdirs(Path file, FsPermission permission) throws IOException { 484 FTPClient client = connect(); 485 try { 486 boolean success = mkdirs(client, file, permission); 487 return success; 488 } finally { 489 disconnect(client); 490 } 491 } 492 493 /** 494 * Convenience method, so that we don't open a new connection when using this 495 * method from within another method. Otherwise every API invocation incurs 496 * the overhead of opening/closing a TCP connection. 497 */ 498 private boolean mkdirs(FTPClient client, Path file, FsPermission permission) 499 throws IOException { 500 boolean created = true; 501 Path workDir = new Path(client.printWorkingDirectory()); 502 Path absolute = makeAbsolute(workDir, file); 503 String pathName = absolute.getName(); 504 if (!exists(client, absolute)) { 505 Path parent = absolute.getParent(); 506 created = (parent == null || mkdirs(client, parent, FsPermission 507 .getDirDefault())); 508 if (created) { 509 String parentDir = parent.toUri().getPath(); 510 client.changeWorkingDirectory(parentDir); 511 created = created && client.makeDirectory(pathName); 512 } 513 } else if (isFile(client, absolute)) { 514 throw new ParentNotDirectoryException(String.format( 515 "Can't make directory for path %s since it is a file.", absolute)); 516 } 517 return created; 518 } 519 520 /** 521 * Convenience method, so that we don't open a new connection when using this 522 * method from within another method. Otherwise every API invocation incurs 523 * the overhead of opening/closing a TCP connection. 524 */ 525 private boolean isFile(FTPClient client, Path file) { 526 try { 527 return getFileStatus(client, file).isFile(); 528 } catch (FileNotFoundException e) { 529 return false; // file does not exist 530 } catch (IOException ioe) { 531 throw new FTPException("File check failed", ioe); 532 } 533 } 534 535 /* 536 * Assuming that parent of both source and destination is the same. Is the 537 * assumption correct or it is suppose to work like 'move' ? 538 */ 539 @Override 540 public boolean rename(Path src, Path dst) throws IOException { 541 FTPClient client = connect(); 542 try { 543 boolean success = rename(client, src, dst); 544 return success; 545 } finally { 546 disconnect(client); 547 } 548 } 549 550 /** 551 * Probe for a path being a parent of another 552 * @param parent parent path 553 * @param child possible child path 554 * @return true if the parent's path matches the start of the child's 555 */ 556 private boolean isParentOf(Path parent, Path child) { 557 URI parentURI = parent.toUri(); 558 String parentPath = parentURI.getPath(); 559 if (!parentPath.endsWith("/")) { 560 parentPath += "/"; 561 } 562 URI childURI = child.toUri(); 563 String childPath = childURI.getPath(); 564 return childPath.startsWith(parentPath); 565 } 566 567 /** 568 * Convenience method, so that we don't open a new connection when using this 569 * method from within another method. Otherwise every API invocation incurs 570 * the overhead of opening/closing a TCP connection. 571 * 572 * @param client 573 * @param src 574 * @param dst 575 * @return 576 * @throws IOException 577 */ 578 private boolean rename(FTPClient client, Path src, Path dst) 579 throws IOException { 580 Path workDir = new Path(client.printWorkingDirectory()); 581 Path absoluteSrc = makeAbsolute(workDir, src); 582 Path absoluteDst = makeAbsolute(workDir, dst); 583 if (!exists(client, absoluteSrc)) { 584 throw new FileNotFoundException("Source path " + src + " does not exist"); 585 } 586 if (isDirectory(absoluteDst)) { 587 // destination is a directory: rename goes underneath it with the 588 // source name 589 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 590 } 591 if (exists(client, absoluteDst)) { 592 throw new FileAlreadyExistsException("Destination path " + dst 593 + " already exists"); 594 } 595 String parentSrc = absoluteSrc.getParent().toUri().toString(); 596 String parentDst = absoluteDst.getParent().toUri().toString(); 597 if (isParentOf(absoluteSrc, absoluteDst)) { 598 throw new IOException("Cannot rename " + absoluteSrc + " under itself" 599 + " : "+ absoluteDst); 600 } 601 602 if (!parentSrc.equals(parentDst)) { 603 throw new IOException("Cannot rename source: " + absoluteSrc 604 + " to " + absoluteDst 605 + " -"+ E_SAME_DIRECTORY_ONLY); 606 } 607 String from = absoluteSrc.getName(); 608 String to = absoluteDst.getName(); 609 client.changeWorkingDirectory(parentSrc); 610 boolean renamed = client.rename(from, to); 611 return renamed; 612 } 613 614 @Override 615 public Path getWorkingDirectory() { 616 // Return home directory always since we do not maintain state. 617 return getHomeDirectory(); 618 } 619 620 @Override 621 public Path getHomeDirectory() { 622 FTPClient client = null; 623 try { 624 client = connect(); 625 Path homeDir = new Path(client.printWorkingDirectory()); 626 return homeDir; 627 } catch (IOException ioe) { 628 throw new FTPException("Failed to get home directory", ioe); 629 } finally { 630 try { 631 disconnect(client); 632 } catch (IOException ioe) { 633 throw new FTPException("Failed to disconnect", ioe); 634 } 635 } 636 } 637 638 @Override 639 public void setWorkingDirectory(Path newDir) { 640 // we do not maintain the working directory state 641 } 642}