001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.hadoop.fs.ftp; 019 020 import java.io.FileNotFoundException; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.net.URI; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.commons.net.ftp.FTP; 028 import org.apache.commons.net.ftp.FTPClient; 029 import org.apache.commons.net.ftp.FTPFile; 030 import org.apache.commons.net.ftp.FTPReply; 031 import org.apache.hadoop.classification.InterfaceAudience; 032 import org.apache.hadoop.classification.InterfaceStability; 033 import org.apache.hadoop.conf.Configuration; 034 import org.apache.hadoop.fs.FSDataInputStream; 035 import org.apache.hadoop.fs.FSDataOutputStream; 036 import org.apache.hadoop.fs.FileStatus; 037 import org.apache.hadoop.fs.FileSystem; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.hadoop.fs.permission.FsAction; 040 import org.apache.hadoop.fs.permission.FsPermission; 041 import org.apache.hadoop.util.Progressable; 042 043 /** 044 * <p> 045 * A {@link FileSystem} backed by an FTP client provided by <a 046 * href="http://commons.apache.org/net/">Apache Commons Net</a>. 047 * </p> 048 */ 049 @InterfaceAudience.Public 050 @InterfaceStability.Stable 051 public class FTPFileSystem extends FileSystem { 052 053 public static final Log LOG = LogFactory 054 .getLog(FTPFileSystem.class); 055 056 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 057 058 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 059 060 private URI uri; 061 062 /** 063 * Return the protocol scheme for the FileSystem. 064 * <p/> 065 * 066 * @return <code>ftp</code> 067 */ 068 @Override 069 public String getScheme() { 070 return "ftp"; 071 } 072 073 @Override 074 public void initialize(URI uri, Configuration conf) throws IOException { // get 075 super.initialize(uri, conf); 076 // get host information from uri (overrides info in conf) 077 String host = uri.getHost(); 078 host = (host == null) ? conf.get("fs.ftp.host", null) : host; 079 if (host == null) { 080 throw new IOException("Invalid host specified"); 081 } 082 conf.set("fs.ftp.host", host); 083 084 // get port information from uri, (overrides info in conf) 085 int port = uri.getPort(); 086 port = (port == -1) ? FTP.DEFAULT_PORT : port; 087 conf.setInt("fs.ftp.host.port", port); 088 089 // get user/password information from URI (overrides info in conf) 090 String userAndPassword = uri.getUserInfo(); 091 if (userAndPassword == null) { 092 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf 093 .get("fs.ftp.password." + host, null)); 094 if (userAndPassword == null) { 095 throw new IOException("Invalid user/passsword specified"); 096 } 097 } 098 String[] userPasswdInfo = userAndPassword.split(":"); 099 conf.set("fs.ftp.user." + host, userPasswdInfo[0]); 100 if (userPasswdInfo.length > 1) { 101 conf.set("fs.ftp.password." + host, userPasswdInfo[1]); 102 } else { 103 conf.set("fs.ftp.password." + host, null); 104 } 105 setConf(conf); 106 this.uri = uri; 107 } 108 109 /** 110 * Connect to the FTP server using configuration parameters * 111 * 112 * @return An FTPClient instance 113 * @throws IOException 114 */ 115 private FTPClient connect() throws IOException { 116 FTPClient client = null; 117 Configuration conf = getConf(); 118 String host = conf.get("fs.ftp.host"); 119 int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT); 120 String user = conf.get("fs.ftp.user." + host); 121 String password = conf.get("fs.ftp.password." + host); 122 client = new FTPClient(); 123 client.connect(host, port); 124 int reply = client.getReplyCode(); 125 if (!FTPReply.isPositiveCompletion(reply)) { 126 throw new IOException("Server - " + host 127 + " refused connection on port - " + port); 128 } else if (client.login(user, password)) { 129 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); 130 client.setFileType(FTP.BINARY_FILE_TYPE); 131 client.setBufferSize(DEFAULT_BUFFER_SIZE); 132 } else { 133 throw new IOException("Login failed on server - " + host + ", port - " 134 + port); 135 } 136 137 return client; 138 } 139 140 /** 141 * Logout and disconnect the given FTPClient. * 142 * 143 * @param client 144 * @throws IOException 145 */ 146 private void disconnect(FTPClient client) throws IOException { 147 if (client != null) { 148 if (!client.isConnected()) { 149 throw new FTPException("Client not connected"); 150 } 151 boolean logoutSuccess = client.logout(); 152 client.disconnect(); 153 if (!logoutSuccess) { 154 LOG.warn("Logout failed while disconnecting, error code - " 155 + client.getReplyCode()); 156 } 157 } 158 } 159 160 /** 161 * Resolve against given working directory. * 162 * 163 * @param workDir 164 * @param path 165 * @return 166 */ 167 private Path makeAbsolute(Path workDir, Path path) { 168 if (path.isAbsolute()) { 169 return path; 170 } 171 return new Path(workDir, path); 172 } 173 174 @Override 175 public FSDataInputStream open(Path file, int bufferSize) throws IOException { 176 FTPClient client = connect(); 177 Path workDir = new Path(client.printWorkingDirectory()); 178 Path absolute = makeAbsolute(workDir, file); 179 FileStatus fileStat = getFileStatus(client, absolute); 180 if (fileStat.isDirectory()) { 181 disconnect(client); 182 throw new IOException("Path " + file + " is a directory."); 183 } 184 client.allocate(bufferSize); 185 Path parent = absolute.getParent(); 186 // Change to parent directory on the 187 // server. Only then can we read the 188 // file 189 // on the server by opening up an InputStream. As a side effect the working 190 // directory on the server is changed to the parent directory of the file. 191 // The FTP client connection is closed when close() is called on the 192 // FSDataInputStream. 193 client.changeWorkingDirectory(parent.toUri().getPath()); 194 InputStream is = client.retrieveFileStream(file.getName()); 195 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, 196 client, statistics)); 197 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 198 // The ftpClient is an inconsistent state. Must close the stream 199 // which in turn will logout and disconnect from FTP server 200 fis.close(); 201 throw new IOException("Unable to open file: " + file + ", Aborting"); 202 } 203 return fis; 204 } 205 206 /** 207 * A stream obtained via this call must be closed before using other APIs of 208 * this class or else the invocation will block. 209 */ 210 @Override 211 public FSDataOutputStream create(Path file, FsPermission permission, 212 boolean overwrite, int bufferSize, short replication, long blockSize, 213 Progressable progress) throws IOException { 214 final FTPClient client = connect(); 215 Path workDir = new Path(client.printWorkingDirectory()); 216 Path absolute = makeAbsolute(workDir, file); 217 if (exists(client, file)) { 218 if (overwrite) { 219 delete(client, file); 220 } else { 221 disconnect(client); 222 throw new IOException("File already exists: " + file); 223 } 224 } 225 226 Path parent = absolute.getParent(); 227 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) { 228 parent = (parent == null) ? new Path("/") : parent; 229 disconnect(client); 230 throw new IOException("create(): Mkdirs failed to create: " + parent); 231 } 232 client.allocate(bufferSize); 233 // Change to parent directory on the server. Only then can we write to the 234 // file on the server by opening up an OutputStream. As a side effect the 235 // working directory on the server is changed to the parent directory of the 236 // file. The FTP client connection is closed when close() is called on the 237 // FSDataOutputStream. 238 client.changeWorkingDirectory(parent.toUri().getPath()); 239 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file 240 .getName()), statistics) { 241 @Override 242 public void close() throws IOException { 243 super.close(); 244 if (!client.isConnected()) { 245 throw new FTPException("Client not connected"); 246 } 247 boolean cmdCompleted = client.completePendingCommand(); 248 disconnect(client); 249 if (!cmdCompleted) { 250 throw new FTPException("Could not complete transfer, Reply Code - " 251 + client.getReplyCode()); 252 } 253 } 254 }; 255 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 256 // The ftpClient is an inconsistent state. Must close the stream 257 // which in turn will logout and disconnect from FTP server 258 fos.close(); 259 throw new IOException("Unable to create file: " + file + ", Aborting"); 260 } 261 return fos; 262 } 263 264 /** This optional operation is not yet supported. */ 265 @Override 266 public FSDataOutputStream append(Path f, int bufferSize, 267 Progressable progress) throws IOException { 268 throw new IOException("Not supported"); 269 } 270 271 /** 272 * Convenience method, so that we don't open a new connection when using this 273 * method from within another method. Otherwise every API invocation incurs 274 * the overhead of opening/closing a TCP connection. 275 */ 276 private boolean exists(FTPClient client, Path file) { 277 try { 278 return getFileStatus(client, file) != null; 279 } catch (FileNotFoundException fnfe) { 280 return false; 281 } catch (IOException ioe) { 282 throw new FTPException("Failed to get file status", ioe); 283 } 284 } 285 286 @Override 287 public boolean delete(Path file, boolean recursive) throws IOException { 288 FTPClient client = connect(); 289 try { 290 boolean success = delete(client, file, recursive); 291 return success; 292 } finally { 293 disconnect(client); 294 } 295 } 296 297 /** @deprecated Use delete(Path, boolean) instead */ 298 @Deprecated 299 private boolean delete(FTPClient client, Path file) throws IOException { 300 return delete(client, file, false); 301 } 302 303 /** 304 * Convenience method, so that we don't open a new connection when using this 305 * method from within another method. Otherwise every API invocation incurs 306 * the overhead of opening/closing a TCP connection. 307 */ 308 private boolean delete(FTPClient client, Path file, boolean recursive) 309 throws IOException { 310 Path workDir = new Path(client.printWorkingDirectory()); 311 Path absolute = makeAbsolute(workDir, file); 312 String pathName = absolute.toUri().getPath(); 313 FileStatus fileStat = getFileStatus(client, absolute); 314 if (fileStat.isFile()) { 315 return client.deleteFile(pathName); 316 } 317 FileStatus[] dirEntries = listStatus(client, absolute); 318 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { 319 throw new IOException("Directory: " + file + " is not empty."); 320 } 321 if (dirEntries != null) { 322 for (int i = 0; i < dirEntries.length; i++) { 323 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive); 324 } 325 } 326 return client.removeDirectory(pathName); 327 } 328 329 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { 330 FsAction action = FsAction.NONE; 331 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { 332 action.or(FsAction.READ); 333 } 334 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { 335 action.or(FsAction.WRITE); 336 } 337 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { 338 action.or(FsAction.EXECUTE); 339 } 340 return action; 341 } 342 343 private FsPermission getPermissions(FTPFile ftpFile) { 344 FsAction user, group, others; 345 user = getFsAction(FTPFile.USER_ACCESS, ftpFile); 346 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); 347 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); 348 return new FsPermission(user, group, others); 349 } 350 351 @Override 352 public URI getUri() { 353 return uri; 354 } 355 356 @Override 357 public FileStatus[] listStatus(Path file) throws IOException { 358 FTPClient client = connect(); 359 try { 360 FileStatus[] stats = listStatus(client, file); 361 return stats; 362 } finally { 363 disconnect(client); 364 } 365 } 366 367 /** 368 * Convenience method, so that we don't open a new connection when using this 369 * method from within another method. Otherwise every API invocation incurs 370 * the overhead of opening/closing a TCP connection. 371 */ 372 private FileStatus[] listStatus(FTPClient client, Path file) 373 throws IOException { 374 Path workDir = new Path(client.printWorkingDirectory()); 375 Path absolute = makeAbsolute(workDir, file); 376 FileStatus fileStat = getFileStatus(client, absolute); 377 if (fileStat.isFile()) { 378 return new FileStatus[] { fileStat }; 379 } 380 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); 381 FileStatus[] fileStats = new FileStatus[ftpFiles.length]; 382 for (int i = 0; i < ftpFiles.length; i++) { 383 fileStats[i] = getFileStatus(ftpFiles[i], absolute); 384 } 385 return fileStats; 386 } 387 388 @Override 389 public FileStatus getFileStatus(Path file) throws IOException { 390 FTPClient client = connect(); 391 try { 392 FileStatus status = getFileStatus(client, file); 393 return status; 394 } finally { 395 disconnect(client); 396 } 397 } 398 399 /** 400 * Convenience method, so that we don't open a new connection when using this 401 * method from within another method. Otherwise every API invocation incurs 402 * the overhead of opening/closing a TCP connection. 403 */ 404 private FileStatus getFileStatus(FTPClient client, Path file) 405 throws IOException { 406 FileStatus fileStat = null; 407 Path workDir = new Path(client.printWorkingDirectory()); 408 Path absolute = makeAbsolute(workDir, file); 409 Path parentPath = absolute.getParent(); 410 if (parentPath == null) { // root dir 411 long length = -1; // Length of root dir on server not known 412 boolean isDir = true; 413 int blockReplication = 1; 414 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 415 long modTime = -1; // Modification time of root dir not known. 416 Path root = new Path("/"); 417 return new FileStatus(length, isDir, blockReplication, blockSize, 418 modTime, root.makeQualified(this)); 419 } 420 String pathName = parentPath.toUri().getPath(); 421 FTPFile[] ftpFiles = client.listFiles(pathName); 422 if (ftpFiles != null) { 423 for (FTPFile ftpFile : ftpFiles) { 424 if (ftpFile.getName().equals(file.getName())) { // file found in dir 425 fileStat = getFileStatus(ftpFile, parentPath); 426 break; 427 } 428 } 429 if (fileStat == null) { 430 throw new FileNotFoundException("File " + file + " does not exist."); 431 } 432 } else { 433 throw new FileNotFoundException("File " + file + " does not exist."); 434 } 435 return fileStat; 436 } 437 438 /** 439 * Convert the file information in FTPFile to a {@link FileStatus} object. * 440 * 441 * @param ftpFile 442 * @param parentPath 443 * @return FileStatus 444 */ 445 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { 446 long length = ftpFile.getSize(); 447 boolean isDir = ftpFile.isDirectory(); 448 int blockReplication = 1; 449 // Using default block size since there is no way in FTP client to know of 450 // block sizes on server. The assumption could be less than ideal. 451 long blockSize = DEFAULT_BLOCK_SIZE; 452 long modTime = ftpFile.getTimestamp().getTimeInMillis(); 453 long accessTime = 0; 454 FsPermission permission = getPermissions(ftpFile); 455 String user = ftpFile.getUser(); 456 String group = ftpFile.getGroup(); 457 Path filePath = new Path(parentPath, ftpFile.getName()); 458 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 459 accessTime, permission, user, group, filePath.makeQualified(this)); 460 } 461 462 @Override 463 public boolean mkdirs(Path file, FsPermission permission) throws IOException { 464 FTPClient client = connect(); 465 try { 466 boolean success = mkdirs(client, file, permission); 467 return success; 468 } finally { 469 disconnect(client); 470 } 471 } 472 473 /** 474 * Convenience method, so that we don't open a new connection when using this 475 * method from within another method. Otherwise every API invocation incurs 476 * the overhead of opening/closing a TCP connection. 477 */ 478 private boolean mkdirs(FTPClient client, Path file, FsPermission permission) 479 throws IOException { 480 boolean created = true; 481 Path workDir = new Path(client.printWorkingDirectory()); 482 Path absolute = makeAbsolute(workDir, file); 483 String pathName = absolute.getName(); 484 if (!exists(client, absolute)) { 485 Path parent = absolute.getParent(); 486 created = (parent == null || mkdirs(client, parent, FsPermission 487 .getDirDefault())); 488 if (created) { 489 String parentDir = parent.toUri().getPath(); 490 client.changeWorkingDirectory(parentDir); 491 created = created && client.makeDirectory(pathName); 492 } 493 } else if (isFile(client, absolute)) { 494 throw new IOException(String.format( 495 "Can't make directory for path %s since it is a file.", absolute)); 496 } 497 return created; 498 } 499 500 /** 501 * Convenience method, so that we don't open a new connection when using this 502 * method from within another method. Otherwise every API invocation incurs 503 * the overhead of opening/closing a TCP connection. 504 */ 505 private boolean isFile(FTPClient client, Path file) { 506 try { 507 return getFileStatus(client, file).isFile(); 508 } catch (FileNotFoundException e) { 509 return false; // file does not exist 510 } catch (IOException ioe) { 511 throw new FTPException("File check failed", ioe); 512 } 513 } 514 515 /* 516 * Assuming that parent of both source and destination is the same. Is the 517 * assumption correct or it is suppose to work like 'move' ? 518 */ 519 @Override 520 public boolean rename(Path src, Path dst) throws IOException { 521 FTPClient client = connect(); 522 try { 523 boolean success = rename(client, src, dst); 524 return success; 525 } finally { 526 disconnect(client); 527 } 528 } 529 530 /** 531 * Convenience method, so that we don't open a new connection when using this 532 * method from within another method. Otherwise every API invocation incurs 533 * the overhead of opening/closing a TCP connection. 534 * 535 * @param client 536 * @param src 537 * @param dst 538 * @return 539 * @throws IOException 540 */ 541 private boolean rename(FTPClient client, Path src, Path dst) 542 throws IOException { 543 Path workDir = new Path(client.printWorkingDirectory()); 544 Path absoluteSrc = makeAbsolute(workDir, src); 545 Path absoluteDst = makeAbsolute(workDir, dst); 546 if (!exists(client, absoluteSrc)) { 547 throw new IOException("Source path " + src + " does not exist"); 548 } 549 if (exists(client, absoluteDst)) { 550 throw new IOException("Destination path " + dst 551 + " already exist, cannot rename!"); 552 } 553 String parentSrc = absoluteSrc.getParent().toUri().toString(); 554 String parentDst = absoluteDst.getParent().toUri().toString(); 555 String from = src.getName(); 556 String to = dst.getName(); 557 if (!parentSrc.equals(parentDst)) { 558 throw new IOException("Cannot rename parent(source): " + parentSrc 559 + ", parent(destination): " + parentDst); 560 } 561 client.changeWorkingDirectory(parentSrc); 562 boolean renamed = client.rename(from, to); 563 return renamed; 564 } 565 566 @Override 567 public Path getWorkingDirectory() { 568 // Return home directory always since we do not maintain state. 569 return getHomeDirectory(); 570 } 571 572 @Override 573 public Path getHomeDirectory() { 574 FTPClient client = null; 575 try { 576 client = connect(); 577 Path homeDir = new Path(client.printWorkingDirectory()); 578 return homeDir; 579 } catch (IOException ioe) { 580 throw new FTPException("Failed to get home directory", ioe); 581 } finally { 582 try { 583 disconnect(client); 584 } catch (IOException ioe) { 585 throw new FTPException("Failed to disconnect", ioe); 586 } 587 } 588 } 589 590 @Override 591 public void setWorkingDirectory(Path newDir) { 592 // we do not maintain the working directory state 593 } 594 }