001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs.ftp; 019 020 import java.io.FileNotFoundException; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.net.ConnectException; 024 import java.net.URI; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.commons.net.ftp.FTP; 029 import org.apache.commons.net.ftp.FTPClient; 030 import org.apache.commons.net.ftp.FTPFile; 031 import org.apache.commons.net.ftp.FTPReply; 032 import org.apache.hadoop.classification.InterfaceAudience; 033 import org.apache.hadoop.classification.InterfaceStability; 034 import org.apache.hadoop.conf.Configuration; 035 import org.apache.hadoop.fs.FSDataInputStream; 036 import org.apache.hadoop.fs.FSDataOutputStream; 037 import org.apache.hadoop.fs.FileAlreadyExistsException; 038 import org.apache.hadoop.fs.FileStatus; 039 import org.apache.hadoop.fs.FileSystem; 040 import org.apache.hadoop.fs.ParentNotDirectoryException; 041 import org.apache.hadoop.fs.Path; 042 import org.apache.hadoop.fs.permission.FsAction; 043 import org.apache.hadoop.fs.permission.FsPermission; 044 import org.apache.hadoop.net.NetUtils; 045 import org.apache.hadoop.util.Progressable; 046 047 /** 048 * <p> 049 * A {@link FileSystem} backed by an FTP client provided by <a 050 * href="http://commons.apache.org/net/">Apache Commons Net</a>. 051 * </p> 052 */ 053 @InterfaceAudience.Public 054 @InterfaceStability.Stable 055 public class FTPFileSystem extends FileSystem { 056 057 public static final Log LOG = LogFactory 058 .getLog(FTPFileSystem.class); 059 060 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 061 062 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 063 public static final String FS_FTP_USER_PREFIX = "fs.ftp.user."; 064 public static final String FS_FTP_HOST = "fs.ftp.host"; 065 public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; 066 public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; 067 public static final String E_SAME_DIRECTORY_ONLY = 068 "only same directory renames are supported"; 069 070 private URI uri; 071 072 /** 073 * Return the protocol scheme for the FileSystem. 074 * <p/> 075 * 076 * @return <code>ftp</code> 077 */ 078 @Override 079 public String getScheme() { 080 return "ftp"; 081 } 082 083 @Override 084 public void initialize(URI uri, Configuration conf) throws IOException { // get 085 super.initialize(uri, conf); 086 // get host information from uri (overrides info in conf) 087 String host = uri.getHost(); 088 host = (host == null) ? conf.get(FS_FTP_HOST, null) : host; 089 if (host == null) { 090 throw new IOException("Invalid host specified"); 091 } 092 conf.set(FS_FTP_HOST, host); 093 094 // get port information from uri, (overrides info in conf) 095 int port = uri.getPort(); 096 port = (port == -1) ? FTP.DEFAULT_PORT : port; 097 conf.setInt("fs.ftp.host.port", port); 098 099 // get user/password information from URI (overrides info in conf) 100 String userAndPassword = uri.getUserInfo(); 101 if (userAndPassword == null) { 102 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf 103 .get("fs.ftp.password." + host, null)); 104 if (userAndPassword == null) { 105 throw new IOException("Invalid user/passsword specified"); 106 } 107 } 108 String[] userPasswdInfo = userAndPassword.split(":"); 109 conf.set(FS_FTP_USER_PREFIX + host, userPasswdInfo[0]); 110 if (userPasswdInfo.length > 1) { 111 conf.set(FS_FTP_PASSWORD_PREFIX + host, userPasswdInfo[1]); 112 } else { 113 conf.set(FS_FTP_PASSWORD_PREFIX + host, null); 114 } 115 setConf(conf); 116 this.uri = uri; 117 } 118 119 /** 120 * Connect to the FTP server using configuration parameters * 121 * 122 * @return An FTPClient instance 123 * @throws IOException 124 */ 125 private FTPClient connect() throws IOException { 126 FTPClient client = null; 127 Configuration conf = getConf(); 128 String host = conf.get(FS_FTP_HOST); 129 int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); 130 String user = conf.get(FS_FTP_USER_PREFIX + host); 131 String password = conf.get(FS_FTP_PASSWORD_PREFIX + host); 132 client = new FTPClient(); 133 client.connect(host, port); 134 int reply = client.getReplyCode(); 135 if (!FTPReply.isPositiveCompletion(reply)) { 136 throw NetUtils.wrapException(host, port, 137 NetUtils.UNKNOWN_HOST, 0, 138 new ConnectException("Server response " + reply)); 139 } else if (client.login(user, password)) { 140 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); 141 client.setFileType(FTP.BINARY_FILE_TYPE); 142 client.setBufferSize(DEFAULT_BUFFER_SIZE); 143 } else { 144 throw new IOException("Login failed on server - " + host + ", port - " 145 + port + " as user '" + user + "'"); 146 } 147 148 return client; 149 } 150 151 /** 152 * Logout and disconnect the given FTPClient. * 153 * 154 * @param client 155 * @throws IOException 156 */ 157 private void disconnect(FTPClient client) throws IOException { 158 if (client != null) { 159 if (!client.isConnected()) { 160 throw new FTPException("Client not connected"); 161 } 162 boolean logoutSuccess = client.logout(); 163 client.disconnect(); 164 if (!logoutSuccess) { 165 LOG.warn("Logout failed while disconnecting, error code - " 166 + client.getReplyCode()); 167 } 168 } 169 } 170 171 /** 172 * Resolve against given working directory. * 173 * 174 * @param workDir 175 * @param path 176 * @return 177 */ 178 private Path makeAbsolute(Path workDir, Path path) { 179 if (path.isAbsolute()) { 180 return path; 181 } 182 return new Path(workDir, path); 183 } 184 185 @Override 186 public FSDataInputStream open(Path file, int bufferSize) throws IOException { 187 FTPClient client = connect(); 188 Path workDir = new Path(client.printWorkingDirectory()); 189 Path absolute = makeAbsolute(workDir, file); 190 FileStatus fileStat = getFileStatus(client, absolute); 191 if (fileStat.isDirectory()) { 192 disconnect(client); 193 throw new FileNotFoundException("Path " + file + " is a directory."); 194 } 195 client.allocate(bufferSize); 196 Path parent = absolute.getParent(); 197 // Change to parent directory on the 198 // server. Only then can we read the 199 // file 200 // on the server by opening up an InputStream. As a side effect the working 201 // directory on the server is changed to the parent directory of the file. 202 // The FTP client connection is closed when close() is called on the 203 // FSDataInputStream. 204 client.changeWorkingDirectory(parent.toUri().getPath()); 205 InputStream is = client.retrieveFileStream(file.getName()); 206 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, 207 client, statistics)); 208 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 209 // The ftpClient is an inconsistent state. Must close the stream 210 // which in turn will logout and disconnect from FTP server 211 fis.close(); 212 throw new IOException("Unable to open file: " + file + ", Aborting"); 213 } 214 return fis; 215 } 216 217 /** 218 * A stream obtained via this call must be closed before using other APIs of 219 * this class or else the invocation will block. 220 */ 221 @Override 222 public FSDataOutputStream create(Path file, FsPermission permission, 223 boolean overwrite, int bufferSize, short replication, long blockSize, 224 Progressable progress) throws IOException { 225 final FTPClient client = connect(); 226 Path workDir = new Path(client.printWorkingDirectory()); 227 Path absolute = makeAbsolute(workDir, file); 228 FileStatus status; 229 try { 230 status = getFileStatus(client, file); 231 } catch (FileNotFoundException fnfe) { 232 status = null; 233 } 234 if (status != null) { 235 if (overwrite && !status.isDirectory()) { 236 delete(client, file, false); 237 } else { 238 disconnect(client); 239 throw new FileAlreadyExistsException("File already exists: " + file); 240 } 241 } 242 243 Path parent = absolute.getParent(); 244 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) { 245 parent = (parent == null) ? new Path("/") : parent; 246 disconnect(client); 247 throw new IOException("create(): Mkdirs failed to create: " + parent); 248 } 249 client.allocate(bufferSize); 250 // Change to parent directory on the server. Only then can we write to the 251 // file on the server by opening up an OutputStream. As a side effect the 252 // working directory on the server is changed to the parent directory of the 253 // file. The FTP client connection is closed when close() is called on the 254 // FSDataOutputStream. 255 client.changeWorkingDirectory(parent.toUri().getPath()); 256 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file 257 .getName()), statistics) { 258 @Override 259 public void close() throws IOException { 260 super.close(); 261 if (!client.isConnected()) { 262 throw new FTPException("Client not connected"); 263 } 264 boolean cmdCompleted = client.completePendingCommand(); 265 disconnect(client); 266 if (!cmdCompleted) { 267 throw new FTPException("Could not complete transfer, Reply Code - " 268 + client.getReplyCode()); 269 } 270 } 271 }; 272 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 273 // The ftpClient is an inconsistent state. Must close the stream 274 // which in turn will logout and disconnect from FTP server 275 fos.close(); 276 throw new IOException("Unable to create file: " + file + ", Aborting"); 277 } 278 return fos; 279 } 280 281 /** This optional operation is not yet supported. */ 282 @Override 283 public FSDataOutputStream append(Path f, int bufferSize, 284 Progressable progress) throws IOException { 285 throw new IOException("Not supported"); 286 } 287 288 /** 289 * Convenience method, so that we don't open a new connection when using this 290 * method from within another method. Otherwise every API invocation incurs 291 * the overhead of opening/closing a TCP connection. 292 * @throws IOException on IO problems other than FileNotFoundException 293 */ 294 private boolean exists(FTPClient client, Path file) throws IOException { 295 try { 296 return getFileStatus(client, file) != null; 297 } catch (FileNotFoundException fnfe) { 298 return false; 299 } 300 } 301 302 @Override 303 public boolean delete(Path file, boolean recursive) throws IOException { 304 FTPClient client = connect(); 305 try { 306 boolean success = delete(client, file, recursive); 307 return success; 308 } finally { 309 disconnect(client); 310 } 311 } 312 313 /** 314 * Convenience method, so that we don't open a new connection when using this 315 * method from within another method. Otherwise every API invocation incurs 316 * the overhead of opening/closing a TCP connection. 317 */ 318 private boolean delete(FTPClient client, Path file, boolean recursive) 319 throws IOException { 320 Path workDir = new Path(client.printWorkingDirectory()); 321 Path absolute = makeAbsolute(workDir, file); 322 String pathName = absolute.toUri().getPath(); 323 try { 324 FileStatus fileStat = getFileStatus(client, absolute); 325 if (fileStat.isFile()) { 326 return client.deleteFile(pathName); 327 } 328 } catch (FileNotFoundException e) { 329 //the file is not there 330 return false; 331 } 332 FileStatus[] dirEntries = listStatus(client, absolute); 333 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { 334 throw new IOException("Directory: " + file + " is not empty."); 335 } 336 if (dirEntries != null) { 337 for (int i = 0; i < dirEntries.length; i++) { 338 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive); 339 } 340 } 341 return client.removeDirectory(pathName); 342 } 343 344 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { 345 FsAction action = FsAction.NONE; 346 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { 347 action.or(FsAction.READ); 348 } 349 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { 350 action.or(FsAction.WRITE); 351 } 352 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { 353 action.or(FsAction.EXECUTE); 354 } 355 return action; 356 } 357 358 private FsPermission getPermissions(FTPFile ftpFile) { 359 FsAction user, group, others; 360 user = getFsAction(FTPFile.USER_ACCESS, ftpFile); 361 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); 362 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); 363 return new FsPermission(user, group, others); 364 } 365 366 @Override 367 public URI getUri() { 368 return uri; 369 } 370 371 @Override 372 public FileStatus[] listStatus(Path file) throws IOException { 373 FTPClient client = connect(); 374 try { 375 FileStatus[] stats = listStatus(client, file); 376 return stats; 377 } finally { 378 disconnect(client); 379 } 380 } 381 382 /** 383 * Convenience method, so that we don't open a new connection when using this 384 * method from within another method. Otherwise every API invocation incurs 385 * the overhead of opening/closing a TCP connection. 386 */ 387 private FileStatus[] listStatus(FTPClient client, Path file) 388 throws IOException { 389 Path workDir = new Path(client.printWorkingDirectory()); 390 Path absolute = makeAbsolute(workDir, file); 391 FileStatus fileStat = getFileStatus(client, absolute); 392 if (fileStat.isFile()) { 393 return new FileStatus[] { fileStat }; 394 } 395 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); 396 FileStatus[] fileStats = new FileStatus[ftpFiles.length]; 397 for (int i = 0; i < ftpFiles.length; i++) { 398 fileStats[i] = getFileStatus(ftpFiles[i], absolute); 399 } 400 return fileStats; 401 } 402 403 @Override 404 public FileStatus getFileStatus(Path file) throws IOException { 405 FTPClient client = connect(); 406 try { 407 FileStatus status = getFileStatus(client, file); 408 return status; 409 } finally { 410 disconnect(client); 411 } 412 } 413 414 /** 415 * Convenience method, so that we don't open a new connection when using this 416 * method from within another method. Otherwise every API invocation incurs 417 * the overhead of opening/closing a TCP connection. 418 */ 419 private FileStatus getFileStatus(FTPClient client, Path file) 420 throws IOException { 421 FileStatus fileStat = null; 422 Path workDir = new Path(client.printWorkingDirectory()); 423 Path absolute = makeAbsolute(workDir, file); 424 Path parentPath = absolute.getParent(); 425 if (parentPath == null) { // root dir 426 long length = -1; // Length of root dir on server not known 427 boolean isDir = true; 428 int blockReplication = 1; 429 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 430 long modTime = -1; // Modification time of root dir not known. 431 Path root = new Path("/"); 432 return new FileStatus(length, isDir, blockReplication, blockSize, 433 modTime, root.makeQualified(this)); 434 } 435 String pathName = parentPath.toUri().getPath(); 436 FTPFile[] ftpFiles = client.listFiles(pathName); 437 if (ftpFiles != null) { 438 for (FTPFile ftpFile : ftpFiles) { 439 if (ftpFile.getName().equals(file.getName())) { // file found in dir 440 fileStat = getFileStatus(ftpFile, parentPath); 441 break; 442 } 443 } 444 if (fileStat == null) { 445 throw new FileNotFoundException("File " + file + " does not exist."); 446 } 447 } else { 448 throw new FileNotFoundException("File " + file + " does not exist."); 449 } 450 return fileStat; 451 } 452 453 /** 454 * Convert the file information in FTPFile to a {@link FileStatus} object. * 455 * 456 * @param ftpFile 457 * @param parentPath 458 * @return FileStatus 459 */ 460 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { 461 long length = ftpFile.getSize(); 462 boolean isDir = ftpFile.isDirectory(); 463 int blockReplication = 1; 464 // Using default block size since there is no way in FTP client to know of 465 // block sizes on server. The assumption could be less than ideal. 466 long blockSize = DEFAULT_BLOCK_SIZE; 467 long modTime = ftpFile.getTimestamp().getTimeInMillis(); 468 long accessTime = 0; 469 FsPermission permission = getPermissions(ftpFile); 470 String user = ftpFile.getUser(); 471 String group = ftpFile.getGroup(); 472 Path filePath = new Path(parentPath, ftpFile.getName()); 473 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 474 accessTime, permission, user, group, filePath.makeQualified(this)); 475 } 476 477 @Override 478 public boolean mkdirs(Path file, FsPermission permission) throws IOException { 479 FTPClient client = connect(); 480 try { 481 boolean success = mkdirs(client, file, permission); 482 return success; 483 } finally { 484 disconnect(client); 485 } 486 } 487 488 /** 489 * Convenience method, so that we don't open a new connection when using this 490 * method from within another method. Otherwise every API invocation incurs 491 * the overhead of opening/closing a TCP connection. 492 */ 493 private boolean mkdirs(FTPClient client, Path file, FsPermission permission) 494 throws IOException { 495 boolean created = true; 496 Path workDir = new Path(client.printWorkingDirectory()); 497 Path absolute = makeAbsolute(workDir, file); 498 String pathName = absolute.getName(); 499 if (!exists(client, absolute)) { 500 Path parent = absolute.getParent(); 501 created = (parent == null || mkdirs(client, parent, FsPermission 502 .getDirDefault())); 503 if (created) { 504 String parentDir = parent.toUri().getPath(); 505 client.changeWorkingDirectory(parentDir); 506 created = created && client.makeDirectory(pathName); 507 } 508 } else if (isFile(client, absolute)) { 509 throw new ParentNotDirectoryException(String.format( 510 "Can't make directory for path %s since it is a file.", absolute)); 511 } 512 return created; 513 } 514 515 /** 516 * Convenience method, so that we don't open a new connection when using this 517 * method from within another method. Otherwise every API invocation incurs 518 * the overhead of opening/closing a TCP connection. 519 */ 520 private boolean isFile(FTPClient client, Path file) { 521 try { 522 return getFileStatus(client, file).isFile(); 523 } catch (FileNotFoundException e) { 524 return false; // file does not exist 525 } catch (IOException ioe) { 526 throw new FTPException("File check failed", ioe); 527 } 528 } 529 530 /* 531 * Assuming that parent of both source and destination is the same. Is the 532 * assumption correct or it is suppose to work like 'move' ? 533 */ 534 @Override 535 public boolean rename(Path src, Path dst) throws IOException { 536 FTPClient client = connect(); 537 try { 538 boolean success = rename(client, src, dst); 539 return success; 540 } finally { 541 disconnect(client); 542 } 543 } 544 545 /** 546 * Probe for a path being a parent of another 547 * @param parent parent path 548 * @param child possible child path 549 * @return true if the parent's path matches the start of the child's 550 */ 551 private boolean isParentOf(Path parent, Path child) { 552 URI parentURI = parent.toUri(); 553 String parentPath = parentURI.getPath(); 554 if (!parentPath.endsWith("/")) { 555 parentPath += "/"; 556 } 557 URI childURI = child.toUri(); 558 String childPath = childURI.getPath(); 559 return childPath.startsWith(parentPath); 560 } 561 562 /** 563 * Convenience method, so that we don't open a new connection when using this 564 * method from within another method. Otherwise every API invocation incurs 565 * the overhead of opening/closing a TCP connection. 566 * 567 * @param client 568 * @param src 569 * @param dst 570 * @return 571 * @throws IOException 572 */ 573 private boolean rename(FTPClient client, Path src, Path dst) 574 throws IOException { 575 Path workDir = new Path(client.printWorkingDirectory()); 576 Path absoluteSrc = makeAbsolute(workDir, src); 577 Path absoluteDst = makeAbsolute(workDir, dst); 578 if (!exists(client, absoluteSrc)) { 579 throw new FileNotFoundException("Source path " + src + " does not exist"); 580 } 581 if (isDirectory(absoluteDst)) { 582 // destination is a directory: rename goes underneath it with the 583 // source name 584 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 585 } 586 if (exists(client, absoluteDst)) { 587 throw new FileAlreadyExistsException("Destination path " + dst 588 + " already exists"); 589 } 590 String parentSrc = absoluteSrc.getParent().toUri().toString(); 591 String parentDst = absoluteDst.getParent().toUri().toString(); 592 if (isParentOf(absoluteSrc, absoluteDst)) { 593 throw new IOException("Cannot rename " + absoluteSrc + " under itself" 594 + " : "+ absoluteDst); 595 } 596 597 if (!parentSrc.equals(parentDst)) { 598 throw new IOException("Cannot rename source: " + absoluteSrc 599 + " to " + absoluteDst 600 + " -"+ E_SAME_DIRECTORY_ONLY); 601 } 602 String from = absoluteSrc.getName(); 603 String to = absoluteDst.getName(); 604 client.changeWorkingDirectory(parentSrc); 605 boolean renamed = client.rename(from, to); 606 return renamed; 607 } 608 609 @Override 610 public Path getWorkingDirectory() { 611 // Return home directory always since we do not maintain state. 612 return getHomeDirectory(); 613 } 614 615 @Override 616 public Path getHomeDirectory() { 617 FTPClient client = null; 618 try { 619 client = connect(); 620 Path homeDir = new Path(client.printWorkingDirectory()); 621 return homeDir; 622 } catch (IOException ioe) { 623 throw new FTPException("Failed to get home directory", ioe); 624 } finally { 625 try { 626 disconnect(client); 627 } catch (IOException ioe) { 628 throw new FTPException("Failed to disconnect", ioe); 629 } 630 } 631 } 632 633 @Override 634 public void setWorkingDirectory(Path newDir) { 635 // we do not maintain the working directory state 636 } 637 }