001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs.ftp; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.net.URI; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.commons.net.ftp.FTP; 028import org.apache.commons.net.ftp.FTPClient; 029import org.apache.commons.net.ftp.FTPFile; 030import org.apache.commons.net.ftp.FTPReply; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsAction; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.util.Progressable; 042 043/** 044 * <p> 045 * A {@link FileSystem} backed by an FTP client provided by <a 046 * href="http://commons.apache.org/net/">Apache Commons Net</a>. 047 * </p> 048 */ 049@InterfaceAudience.Public 050@InterfaceStability.Stable 051public class FTPFileSystem extends FileSystem { 052 053 public static final Log LOG = LogFactory 054 .getLog(FTPFileSystem.class); 055 056 public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; 057 058 public static final int DEFAULT_BLOCK_SIZE = 4 * 1024; 059 060 private URI uri; 061 062 @Override 063 public void initialize(URI uri, Configuration conf) throws IOException { // get 064 super.initialize(uri, conf); 065 // get host information from uri (overrides info in conf) 066 String host = uri.getHost(); 067 host = (host == null) ? conf.get("fs.ftp.host", null) : host; 068 if (host == null) { 069 throw new IOException("Invalid host specified"); 070 } 071 conf.set("fs.ftp.host", host); 072 073 // get port information from uri, (overrides info in conf) 074 int port = uri.getPort(); 075 port = (port == -1) ? FTP.DEFAULT_PORT : port; 076 conf.setInt("fs.ftp.host.port", port); 077 078 // get user/password information from URI (overrides info in conf) 079 String userAndPassword = uri.getUserInfo(); 080 if (userAndPassword == null) { 081 userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf 082 .get("fs.ftp.password." + host, null)); 083 if (userAndPassword == null) { 084 throw new IOException("Invalid user/passsword specified"); 085 } 086 } 087 String[] userPasswdInfo = userAndPassword.split(":"); 088 conf.set("fs.ftp.user." + host, userPasswdInfo[0]); 089 if (userPasswdInfo.length > 1) { 090 conf.set("fs.ftp.password." + host, userPasswdInfo[1]); 091 } else { 092 conf.set("fs.ftp.password." + host, null); 093 } 094 setConf(conf); 095 this.uri = uri; 096 } 097 098 /** 099 * Connect to the FTP server using configuration parameters * 100 * 101 * @return An FTPClient instance 102 * @throws IOException 103 */ 104 private FTPClient connect() throws IOException { 105 FTPClient client = null; 106 Configuration conf = getConf(); 107 String host = conf.get("fs.ftp.host"); 108 int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT); 109 String user = conf.get("fs.ftp.user." + host); 110 String password = conf.get("fs.ftp.password." + host); 111 client = new FTPClient(); 112 client.connect(host, port); 113 int reply = client.getReplyCode(); 114 if (!FTPReply.isPositiveCompletion(reply)) { 115 throw new IOException("Server - " + host 116 + " refused connection on port - " + port); 117 } else if (client.login(user, password)) { 118 client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); 119 client.setFileType(FTP.BINARY_FILE_TYPE); 120 client.setBufferSize(DEFAULT_BUFFER_SIZE); 121 } else { 122 throw new IOException("Login failed on server - " + host + ", port - " 123 + port); 124 } 125 126 return client; 127 } 128 129 /** 130 * Logout and disconnect the given FTPClient. * 131 * 132 * @param client 133 * @throws IOException 134 */ 135 private void disconnect(FTPClient client) throws IOException { 136 if (client != null) { 137 if (!client.isConnected()) { 138 throw new FTPException("Client not connected"); 139 } 140 boolean logoutSuccess = client.logout(); 141 client.disconnect(); 142 if (!logoutSuccess) { 143 LOG.warn("Logout failed while disconnecting, error code - " 144 + client.getReplyCode()); 145 } 146 } 147 } 148 149 /** 150 * Resolve against given working directory. * 151 * 152 * @param workDir 153 * @param path 154 * @return 155 */ 156 private Path makeAbsolute(Path workDir, Path path) { 157 if (path.isAbsolute()) { 158 return path; 159 } 160 return new Path(workDir, path); 161 } 162 163 @Override 164 public FSDataInputStream open(Path file, int bufferSize) throws IOException { 165 FTPClient client = connect(); 166 Path workDir = new Path(client.printWorkingDirectory()); 167 Path absolute = makeAbsolute(workDir, file); 168 FileStatus fileStat = getFileStatus(client, absolute); 169 if (fileStat.isDirectory()) { 170 disconnect(client); 171 throw new IOException("Path " + file + " is a directory."); 172 } 173 client.allocate(bufferSize); 174 Path parent = absolute.getParent(); 175 // Change to parent directory on the 176 // server. Only then can we read the 177 // file 178 // on the server by opening up an InputStream. As a side effect the working 179 // directory on the server is changed to the parent directory of the file. 180 // The FTP client connection is closed when close() is called on the 181 // FSDataInputStream. 182 client.changeWorkingDirectory(parent.toUri().getPath()); 183 InputStream is = client.retrieveFileStream(file.getName()); 184 FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, 185 client, statistics)); 186 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 187 // The ftpClient is an inconsistent state. Must close the stream 188 // which in turn will logout and disconnect from FTP server 189 fis.close(); 190 throw new IOException("Unable to open file: " + file + ", Aborting"); 191 } 192 return fis; 193 } 194 195 /** 196 * A stream obtained via this call must be closed before using other APIs of 197 * this class or else the invocation will block. 198 */ 199 @Override 200 public FSDataOutputStream create(Path file, FsPermission permission, 201 boolean overwrite, int bufferSize, short replication, long blockSize, 202 Progressable progress) throws IOException { 203 final FTPClient client = connect(); 204 Path workDir = new Path(client.printWorkingDirectory()); 205 Path absolute = makeAbsolute(workDir, file); 206 if (exists(client, file)) { 207 if (overwrite) { 208 delete(client, file); 209 } else { 210 disconnect(client); 211 throw new IOException("File already exists: " + file); 212 } 213 } 214 215 Path parent = absolute.getParent(); 216 if (parent == null || !mkdirs(client, parent, FsPermission.getDirDefault())) { 217 parent = (parent == null) ? new Path("/") : parent; 218 disconnect(client); 219 throw new IOException("create(): Mkdirs failed to create: " + parent); 220 } 221 client.allocate(bufferSize); 222 // Change to parent directory on the server. Only then can we write to the 223 // file on the server by opening up an OutputStream. As a side effect the 224 // working directory on the server is changed to the parent directory of the 225 // file. The FTP client connection is closed when close() is called on the 226 // FSDataOutputStream. 227 client.changeWorkingDirectory(parent.toUri().getPath()); 228 FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file 229 .getName()), statistics) { 230 @Override 231 public void close() throws IOException { 232 super.close(); 233 if (!client.isConnected()) { 234 throw new FTPException("Client not connected"); 235 } 236 boolean cmdCompleted = client.completePendingCommand(); 237 disconnect(client); 238 if (!cmdCompleted) { 239 throw new FTPException("Could not complete transfer, Reply Code - " 240 + client.getReplyCode()); 241 } 242 } 243 }; 244 if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { 245 // The ftpClient is an inconsistent state. Must close the stream 246 // which in turn will logout and disconnect from FTP server 247 fos.close(); 248 throw new IOException("Unable to create file: " + file + ", Aborting"); 249 } 250 return fos; 251 } 252 253 /** This optional operation is not yet supported. */ 254 public FSDataOutputStream append(Path f, int bufferSize, 255 Progressable progress) throws IOException { 256 throw new IOException("Not supported"); 257 } 258 259 /** 260 * Convenience method, so that we don't open a new connection when using this 261 * method from within another method. Otherwise every API invocation incurs 262 * the overhead of opening/closing a TCP connection. 263 */ 264 private boolean exists(FTPClient client, Path file) { 265 try { 266 return getFileStatus(client, file) != null; 267 } catch (FileNotFoundException fnfe) { 268 return false; 269 } catch (IOException ioe) { 270 throw new FTPException("Failed to get file status", ioe); 271 } 272 } 273 274 @Override 275 public boolean delete(Path file, boolean recursive) throws IOException { 276 FTPClient client = connect(); 277 try { 278 boolean success = delete(client, file, recursive); 279 return success; 280 } finally { 281 disconnect(client); 282 } 283 } 284 285 /** @deprecated Use delete(Path, boolean) instead */ 286 @Deprecated 287 private boolean delete(FTPClient client, Path file) throws IOException { 288 return delete(client, file, false); 289 } 290 291 /** 292 * Convenience method, so that we don't open a new connection when using this 293 * method from within another method. Otherwise every API invocation incurs 294 * the overhead of opening/closing a TCP connection. 295 */ 296 private boolean delete(FTPClient client, Path file, boolean recursive) 297 throws IOException { 298 Path workDir = new Path(client.printWorkingDirectory()); 299 Path absolute = makeAbsolute(workDir, file); 300 String pathName = absolute.toUri().getPath(); 301 FileStatus fileStat = getFileStatus(client, absolute); 302 if (fileStat.isFile()) { 303 return client.deleteFile(pathName); 304 } 305 FileStatus[] dirEntries = listStatus(client, absolute); 306 if (dirEntries != null && dirEntries.length > 0 && !(recursive)) { 307 throw new IOException("Directory: " + file + " is not empty."); 308 } 309 if (dirEntries != null) { 310 for (int i = 0; i < dirEntries.length; i++) { 311 delete(client, new Path(absolute, dirEntries[i].getPath()), recursive); 312 } 313 } 314 return client.removeDirectory(pathName); 315 } 316 317 private FsAction getFsAction(int accessGroup, FTPFile ftpFile) { 318 FsAction action = FsAction.NONE; 319 if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) { 320 action.or(FsAction.READ); 321 } 322 if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) { 323 action.or(FsAction.WRITE); 324 } 325 if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) { 326 action.or(FsAction.EXECUTE); 327 } 328 return action; 329 } 330 331 private FsPermission getPermissions(FTPFile ftpFile) { 332 FsAction user, group, others; 333 user = getFsAction(FTPFile.USER_ACCESS, ftpFile); 334 group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile); 335 others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile); 336 return new FsPermission(user, group, others); 337 } 338 339 @Override 340 public URI getUri() { 341 return uri; 342 } 343 344 @Override 345 public FileStatus[] listStatus(Path file) throws IOException { 346 FTPClient client = connect(); 347 try { 348 FileStatus[] stats = listStatus(client, file); 349 return stats; 350 } finally { 351 disconnect(client); 352 } 353 } 354 355 /** 356 * Convenience method, so that we don't open a new connection when using this 357 * method from within another method. Otherwise every API invocation incurs 358 * the overhead of opening/closing a TCP connection. 359 */ 360 private FileStatus[] listStatus(FTPClient client, Path file) 361 throws IOException { 362 Path workDir = new Path(client.printWorkingDirectory()); 363 Path absolute = makeAbsolute(workDir, file); 364 FileStatus fileStat = getFileStatus(client, absolute); 365 if (fileStat.isFile()) { 366 return new FileStatus[] { fileStat }; 367 } 368 FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath()); 369 FileStatus[] fileStats = new FileStatus[ftpFiles.length]; 370 for (int i = 0; i < ftpFiles.length; i++) { 371 fileStats[i] = getFileStatus(ftpFiles[i], absolute); 372 } 373 return fileStats; 374 } 375 376 @Override 377 public FileStatus getFileStatus(Path file) throws IOException { 378 FTPClient client = connect(); 379 try { 380 FileStatus status = getFileStatus(client, file); 381 return status; 382 } finally { 383 disconnect(client); 384 } 385 } 386 387 /** 388 * Convenience method, so that we don't open a new connection when using this 389 * method from within another method. Otherwise every API invocation incurs 390 * the overhead of opening/closing a TCP connection. 391 */ 392 private FileStatus getFileStatus(FTPClient client, Path file) 393 throws IOException { 394 FileStatus fileStat = null; 395 Path workDir = new Path(client.printWorkingDirectory()); 396 Path absolute = makeAbsolute(workDir, file); 397 Path parentPath = absolute.getParent(); 398 if (parentPath == null) { // root dir 399 long length = -1; // Length of root dir on server not known 400 boolean isDir = true; 401 int blockReplication = 1; 402 long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known. 403 long modTime = -1; // Modification time of root dir not known. 404 Path root = new Path("/"); 405 return new FileStatus(length, isDir, blockReplication, blockSize, 406 modTime, root.makeQualified(this)); 407 } 408 String pathName = parentPath.toUri().getPath(); 409 FTPFile[] ftpFiles = client.listFiles(pathName); 410 if (ftpFiles != null) { 411 for (FTPFile ftpFile : ftpFiles) { 412 if (ftpFile.getName().equals(file.getName())) { // file found in dir 413 fileStat = getFileStatus(ftpFile, parentPath); 414 break; 415 } 416 } 417 if (fileStat == null) { 418 throw new FileNotFoundException("File " + file + " does not exist."); 419 } 420 } else { 421 throw new FileNotFoundException("File " + file + " does not exist."); 422 } 423 return fileStat; 424 } 425 426 /** 427 * Convert the file information in FTPFile to a {@link FileStatus} object. * 428 * 429 * @param ftpFile 430 * @param parentPath 431 * @return FileStatus 432 */ 433 private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) { 434 long length = ftpFile.getSize(); 435 boolean isDir = ftpFile.isDirectory(); 436 int blockReplication = 1; 437 // Using default block size since there is no way in FTP client to know of 438 // block sizes on server. The assumption could be less than ideal. 439 long blockSize = DEFAULT_BLOCK_SIZE; 440 long modTime = ftpFile.getTimestamp().getTimeInMillis(); 441 long accessTime = 0; 442 FsPermission permission = getPermissions(ftpFile); 443 String user = ftpFile.getUser(); 444 String group = ftpFile.getGroup(); 445 Path filePath = new Path(parentPath, ftpFile.getName()); 446 return new FileStatus(length, isDir, blockReplication, blockSize, modTime, 447 accessTime, permission, user, group, filePath.makeQualified(this)); 448 } 449 450 @Override 451 public boolean mkdirs(Path file, FsPermission permission) throws IOException { 452 FTPClient client = connect(); 453 try { 454 boolean success = mkdirs(client, file, permission); 455 return success; 456 } finally { 457 disconnect(client); 458 } 459 } 460 461 /** 462 * Convenience method, so that we don't open a new connection when using this 463 * method from within another method. Otherwise every API invocation incurs 464 * the overhead of opening/closing a TCP connection. 465 */ 466 private boolean mkdirs(FTPClient client, Path file, FsPermission permission) 467 throws IOException { 468 boolean created = true; 469 Path workDir = new Path(client.printWorkingDirectory()); 470 Path absolute = makeAbsolute(workDir, file); 471 String pathName = absolute.getName(); 472 if (!exists(client, absolute)) { 473 Path parent = absolute.getParent(); 474 created = (parent == null || mkdirs(client, parent, FsPermission 475 .getDirDefault())); 476 if (created) { 477 String parentDir = parent.toUri().getPath(); 478 client.changeWorkingDirectory(parentDir); 479 created = created && client.makeDirectory(pathName); 480 } 481 } else if (isFile(client, absolute)) { 482 throw new IOException(String.format( 483 "Can't make directory for path %s since it is a file.", absolute)); 484 } 485 return created; 486 } 487 488 /** 489 * Convenience method, so that we don't open a new connection when using this 490 * method from within another method. Otherwise every API invocation incurs 491 * the overhead of opening/closing a TCP connection. 492 */ 493 private boolean isFile(FTPClient client, Path file) { 494 try { 495 return getFileStatus(client, file).isFile(); 496 } catch (FileNotFoundException e) { 497 return false; // file does not exist 498 } catch (IOException ioe) { 499 throw new FTPException("File check failed", ioe); 500 } 501 } 502 503 /* 504 * Assuming that parent of both source and destination is the same. Is the 505 * assumption correct or it is suppose to work like 'move' ? 506 */ 507 @Override 508 public boolean rename(Path src, Path dst) throws IOException { 509 FTPClient client = connect(); 510 try { 511 boolean success = rename(client, src, dst); 512 return success; 513 } finally { 514 disconnect(client); 515 } 516 } 517 518 /** 519 * Convenience method, so that we don't open a new connection when using this 520 * method from within another method. Otherwise every API invocation incurs 521 * the overhead of opening/closing a TCP connection. 522 * 523 * @param client 524 * @param src 525 * @param dst 526 * @return 527 * @throws IOException 528 */ 529 private boolean rename(FTPClient client, Path src, Path dst) 530 throws IOException { 531 Path workDir = new Path(client.printWorkingDirectory()); 532 Path absoluteSrc = makeAbsolute(workDir, src); 533 Path absoluteDst = makeAbsolute(workDir, dst); 534 if (!exists(client, absoluteSrc)) { 535 throw new IOException("Source path " + src + " does not exist"); 536 } 537 if (exists(client, absoluteDst)) { 538 throw new IOException("Destination path " + dst 539 + " already exist, cannot rename!"); 540 } 541 String parentSrc = absoluteSrc.getParent().toUri().toString(); 542 String parentDst = absoluteDst.getParent().toUri().toString(); 543 String from = src.getName(); 544 String to = dst.getName(); 545 if (!parentSrc.equals(parentDst)) { 546 throw new IOException("Cannot rename parent(source): " + parentSrc 547 + ", parent(destination): " + parentDst); 548 } 549 client.changeWorkingDirectory(parentSrc); 550 boolean renamed = client.rename(from, to); 551 return renamed; 552 } 553 554 @Override 555 public Path getWorkingDirectory() { 556 // Return home directory always since we do not maintain state. 557 return getHomeDirectory(); 558 } 559 560 @Override 561 public Path getHomeDirectory() { 562 FTPClient client = null; 563 try { 564 client = connect(); 565 Path homeDir = new Path(client.printWorkingDirectory()); 566 return homeDir; 567 } catch (IOException ioe) { 568 throw new FTPException("Failed to get home directory", ioe); 569 } finally { 570 try { 571 disconnect(client); 572 } catch (IOException ioe) { 573 throw new FTPException("Failed to disconnect", ioe); 574 } 575 } 576 } 577 578 @Override 579 public void setWorkingDirectory(Path newDir) { 580 // we do not maintain the working directory state 581 } 582}