001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.BufferedOutputStream; 022import java.io.DataOutput; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileNotFoundException; 026import java.io.FileOutputStream; 027import java.io.IOException; 028import java.io.OutputStream; 029import java.net.URI; 030import java.nio.ByteBuffer; 031import java.util.Arrays; 032import java.util.StringTokenizer; 033 034import org.apache.hadoop.classification.InterfaceAudience; 035import org.apache.hadoop.classification.InterfaceStability; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.permission.FsPermission; 038import org.apache.hadoop.io.nativeio.NativeIO; 039import org.apache.hadoop.util.Progressable; 040import org.apache.hadoop.util.Shell; 041import org.apache.hadoop.util.StringUtils; 042 043/**************************************************************** 044 * Implement the FileSystem API for the raw local filesystem. 045 * 046 *****************************************************************/ 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public class RawLocalFileSystem extends FileSystem { 050 static final URI NAME = URI.create("file:///"); 051 private Path workingDir; 052 053 public RawLocalFileSystem() { 054 workingDir = getInitialWorkingDirectory(); 055 } 056 057 private Path makeAbsolute(Path f) { 058 if (f.isAbsolute()) { 059 return f; 060 } else { 061 return new Path(workingDir, f); 062 } 063 } 064 065 /** Convert a path to a File. */ 066 public File pathToFile(Path path) { 067 checkPath(path); 068 if (!path.isAbsolute()) { 069 path = new Path(getWorkingDirectory(), path); 070 } 071 return new File(path.toUri().getPath()); 072 } 073 074 public URI getUri() { return NAME; } 075 076 public void initialize(URI uri, Configuration conf) throws IOException { 077 super.initialize(uri, conf); 078 setConf(conf); 079 } 080 081 class TrackingFileInputStream extends FileInputStream { 082 public TrackingFileInputStream(File f) throws IOException { 083 super(f); 084 } 085 086 public int read() throws IOException { 087 int result = super.read(); 088 if (result != -1) { 089 statistics.incrementBytesRead(1); 090 } 091 return result; 092 } 093 094 public int read(byte[] data) throws IOException { 095 int result = super.read(data); 096 if (result != -1) { 097 statistics.incrementBytesRead(result); 098 } 099 return result; 100 } 101 102 public int read(byte[] data, int offset, int length) throws IOException { 103 int result = super.read(data, offset, length); 104 if (result != -1) { 105 statistics.incrementBytesRead(result); 106 } 107 return result; 108 } 109 } 110 111 /******************************************************* 112 * For open()'s FSInputStream. 113 *******************************************************/ 114 class LocalFSFileInputStream extends FSInputStream { 115 private FileInputStream fis; 116 private long position; 117 118 public LocalFSFileInputStream(Path f) throws IOException { 119 this.fis = new TrackingFileInputStream(pathToFile(f)); 120 } 121 122 public void seek(long pos) throws IOException { 123 fis.getChannel().position(pos); 124 this.position = pos; 125 } 126 127 public long getPos() throws IOException { 128 return this.position; 129 } 130 131 public boolean seekToNewSource(long targetPos) throws IOException { 132 return false; 133 } 134 135 /* 136 * Just forward to the fis 137 */ 138 public int available() throws IOException { return fis.available(); } 139 public void close() throws IOException { fis.close(); } 140 @Override 141 public boolean markSupported() { return false; } 142 143 public int read() throws IOException { 144 try { 145 int value = fis.read(); 146 if (value >= 0) { 147 this.position++; 148 } 149 return value; 150 } catch (IOException e) { // unexpected exception 151 throw new FSError(e); // assume native fs error 152 } 153 } 154 155 public int read(byte[] b, int off, int len) throws IOException { 156 try { 157 int value = fis.read(b, off, len); 158 if (value > 0) { 159 this.position += value; 160 } 161 return value; 162 } catch (IOException e) { // unexpected exception 163 throw new FSError(e); // assume native fs error 164 } 165 } 166 167 public int read(long position, byte[] b, int off, int len) 168 throws IOException { 169 ByteBuffer bb = ByteBuffer.wrap(b, off, len); 170 try { 171 return fis.getChannel().read(bb, position); 172 } catch (IOException e) { 173 throw new FSError(e); 174 } 175 } 176 177 public long skip(long n) throws IOException { 178 long value = fis.skip(n); 179 if (value > 0) { 180 this.position += value; 181 } 182 return value; 183 } 184 } 185 186 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 187 if (!exists(f)) { 188 throw new FileNotFoundException(f.toString()); 189 } 190 return new FSDataInputStream(new BufferedFSInputStream( 191 new LocalFSFileInputStream(f), bufferSize)); 192 } 193 194 /********************************************************* 195 * For create()'s FSOutputStream. 196 *********************************************************/ 197 class LocalFSFileOutputStream extends OutputStream { 198 private FileOutputStream fos; 199 200 private LocalFSFileOutputStream(Path f, boolean append) throws IOException { 201 this.fos = new FileOutputStream(pathToFile(f), append); 202 } 203 204 /* 205 * Just forward to the fos 206 */ 207 public void close() throws IOException { fos.close(); } 208 public void flush() throws IOException { fos.flush(); } 209 public void write(byte[] b, int off, int len) throws IOException { 210 try { 211 fos.write(b, off, len); 212 } catch (IOException e) { // unexpected exception 213 throw new FSError(e); // assume native fs error 214 } 215 } 216 217 public void write(int b) throws IOException { 218 try { 219 fos.write(b); 220 } catch (IOException e) { // unexpected exception 221 throw new FSError(e); // assume native fs error 222 } 223 } 224 } 225 226 /** {@inheritDoc} */ 227 public FSDataOutputStream append(Path f, int bufferSize, 228 Progressable progress) throws IOException { 229 if (!exists(f)) { 230 throw new FileNotFoundException("File " + f + " not found"); 231 } 232 if (getFileStatus(f).isDirectory()) { 233 throw new IOException("Cannot append to a diretory (=" + f + " )"); 234 } 235 return new FSDataOutputStream(new BufferedOutputStream( 236 new LocalFSFileOutputStream(f, true), bufferSize), statistics); 237 } 238 239 /** {@inheritDoc} */ 240 @Override 241 public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 242 short replication, long blockSize, Progressable progress) 243 throws IOException { 244 return create(f, overwrite, true, bufferSize, replication, blockSize, progress); 245 } 246 247 private FSDataOutputStream create(Path f, boolean overwrite, 248 boolean createParent, int bufferSize, short replication, long blockSize, 249 Progressable progress) throws IOException { 250 if (exists(f) && !overwrite) { 251 throw new IOException("File already exists: "+f); 252 } 253 Path parent = f.getParent(); 254 if (parent != null && !mkdirs(parent)) { 255 throw new IOException("Mkdirs failed to create " + parent.toString()); 256 } 257 return new FSDataOutputStream(new BufferedOutputStream( 258 new LocalFSFileOutputStream(f, false), bufferSize), statistics); 259 } 260 261 /** {@inheritDoc} */ 262 @Override 263 public FSDataOutputStream create(Path f, FsPermission permission, 264 boolean overwrite, int bufferSize, short replication, long blockSize, 265 Progressable progress) throws IOException { 266 267 FSDataOutputStream out = create(f, 268 overwrite, bufferSize, replication, blockSize, progress); 269 setPermission(f, permission); 270 return out; 271 } 272 273 /** {@inheritDoc} */ 274 @Override 275 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 276 boolean overwrite, 277 int bufferSize, short replication, long blockSize, 278 Progressable progress) throws IOException { 279 FSDataOutputStream out = create(f, 280 overwrite, false, bufferSize, replication, blockSize, progress); 281 setPermission(f, permission); 282 return out; 283 } 284 285 public boolean rename(Path src, Path dst) throws IOException { 286 if (pathToFile(src).renameTo(pathToFile(dst))) { 287 return true; 288 } 289 return FileUtil.copy(this, src, this, dst, true, getConf()); 290 } 291 292 /** 293 * Delete the given path to a file or directory. 294 * @param p the path to delete 295 * @param recursive to delete sub-directories 296 * @return true if the file or directory and all its contents were deleted 297 * @throws IOException if p is non-empty and recursive is false 298 */ 299 public boolean delete(Path p, boolean recursive) throws IOException { 300 File f = pathToFile(p); 301 if (f.isFile()) { 302 return f.delete(); 303 } else if (!recursive && f.isDirectory() && 304 (FileUtil.listFiles(f).length != 0)) { 305 throw new IOException("Directory " + f.toString() + " is not empty"); 306 } 307 return FileUtil.fullyDelete(f); 308 } 309 310 public FileStatus[] listStatus(Path f) throws IOException { 311 File localf = pathToFile(f); 312 FileStatus[] results; 313 314 if (!localf.exists()) { 315 throw new FileNotFoundException("File " + f + " does not exist"); 316 } 317 if (localf.isFile()) { 318 return new FileStatus[] { 319 new RawLocalFileStatus(localf, getDefaultBlockSize(), this) }; 320 } 321 322 File[] names = localf.listFiles(); 323 if (names == null) { 324 return null; 325 } 326 results = new FileStatus[names.length]; 327 int j = 0; 328 for (int i = 0; i < names.length; i++) { 329 try { 330 results[j] = getFileStatus(new Path(names[i].getAbsolutePath())); 331 j++; 332 } catch (FileNotFoundException e) { 333 // ignore the files not found since the dir list may have have changed 334 // since the names[] list was generated. 335 } 336 } 337 if (j == names.length) { 338 return results; 339 } 340 return Arrays.copyOf(results, j); 341 } 342 343 /** 344 * Creates the specified directory hierarchy. Does not 345 * treat existence as an error. 346 */ 347 public boolean mkdirs(Path f) throws IOException { 348 if(f == null) { 349 throw new IllegalArgumentException("mkdirs path arg is null"); 350 } 351 Path parent = f.getParent(); 352 File p2f = pathToFile(f); 353 if(parent != null) { 354 File parent2f = pathToFile(parent); 355 if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) { 356 throw new FileAlreadyExistsException("Parent path is not a directory: " 357 + parent); 358 } 359 } 360 return (parent == null || mkdirs(parent)) && 361 (p2f.mkdir() || p2f.isDirectory()); 362 } 363 364 /** {@inheritDoc} */ 365 @Override 366 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 367 boolean b = mkdirs(f); 368 if(b) { 369 setPermission(f, permission); 370 } 371 return b; 372 } 373 374 375 @Override 376 protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) 377 throws IOException { 378 boolean b = mkdirs(f); 379 setPermission(f, absolutePermission); 380 return b; 381 } 382 383 384 @Override 385 public Path getHomeDirectory() { 386 return this.makeQualified(new Path(System.getProperty("user.home"))); 387 } 388 389 /** 390 * Set the working directory to the given directory. 391 */ 392 @Override 393 public void setWorkingDirectory(Path newDir) { 394 workingDir = makeAbsolute(newDir); 395 checkPath(workingDir); 396 397 } 398 399 @Override 400 public Path getWorkingDirectory() { 401 return workingDir; 402 } 403 404 @Override 405 protected Path getInitialWorkingDirectory() { 406 return this.makeQualified(new Path(System.getProperty("user.dir"))); 407 } 408 409 /** {@inheritDoc} */ 410 @Override 411 public FsStatus getStatus(Path p) throws IOException { 412 File partition = pathToFile(p == null ? new Path("/") : p); 413 //File provides getUsableSpace() and getFreeSpace() 414 //File provides no API to obtain used space, assume used = total - free 415 return new FsStatus(partition.getTotalSpace(), 416 partition.getTotalSpace() - partition.getFreeSpace(), 417 partition.getFreeSpace()); 418 } 419 420 // In the case of the local filesystem, we can just rename the file. 421 public void moveFromLocalFile(Path src, Path dst) throws IOException { 422 rename(src, dst); 423 } 424 425 // We can write output directly to the final location 426 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 427 throws IOException { 428 return fsOutputFile; 429 } 430 431 // It's in the right place - nothing to do. 432 public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile) 433 throws IOException { 434 } 435 436 public void close() throws IOException { 437 super.close(); 438 } 439 440 public String toString() { 441 return "LocalFS"; 442 } 443 444 public FileStatus getFileStatus(Path f) throws IOException { 445 File path = pathToFile(f); 446 if (path.exists()) { 447 return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this); 448 } else { 449 throw new FileNotFoundException("File " + f + " does not exist"); 450 } 451 } 452 453 static class RawLocalFileStatus extends FileStatus { 454 /* We can add extra fields here. It breaks at least CopyFiles.FilePair(). 455 * We recognize if the information is already loaded by check if 456 * onwer.equals(""). 457 */ 458 private boolean isPermissionLoaded() { 459 return !super.getOwner().equals(""); 460 } 461 462 RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) { 463 super(f.length(), f.isDirectory(), 1, defaultBlockSize, 464 f.lastModified(), fs.makeQualified(new Path(f.getPath()))); 465 } 466 467 @Override 468 public FsPermission getPermission() { 469 if (!isPermissionLoaded()) { 470 loadPermissionInfo(); 471 } 472 return super.getPermission(); 473 } 474 475 @Override 476 public String getOwner() { 477 if (!isPermissionLoaded()) { 478 loadPermissionInfo(); 479 } 480 return super.getOwner(); 481 } 482 483 @Override 484 public String getGroup() { 485 if (!isPermissionLoaded()) { 486 loadPermissionInfo(); 487 } 488 return super.getGroup(); 489 } 490 491 /// loads permissions, owner, and group from `ls -ld` 492 private void loadPermissionInfo() { 493 IOException e = null; 494 try { 495 StringTokenizer t = new StringTokenizer( 496 execCommand(new File(getPath().toUri()), 497 Shell.getGET_PERMISSION_COMMAND())); 498 //expected format 499 //-rw------- 1 username groupname ... 500 String permission = t.nextToken(); 501 if (permission.length() > 10) { //files with ACLs might have a '+' 502 permission = permission.substring(0, 10); 503 } 504 setPermission(FsPermission.valueOf(permission)); 505 t.nextToken(); 506 setOwner(t.nextToken()); 507 setGroup(t.nextToken()); 508 } catch (Shell.ExitCodeException ioe) { 509 if (ioe.getExitCode() != 1) { 510 e = ioe; 511 } else { 512 setPermission(null); 513 setOwner(null); 514 setGroup(null); 515 } 516 } catch (IOException ioe) { 517 e = ioe; 518 } finally { 519 if (e != null) { 520 throw new RuntimeException("Error while running command to get " + 521 "file permissions : " + 522 StringUtils.stringifyException(e)); 523 } 524 } 525 } 526 527 @Override 528 public void write(DataOutput out) throws IOException { 529 if (!isPermissionLoaded()) { 530 loadPermissionInfo(); 531 } 532 super.write(out); 533 } 534 } 535 536 /** 537 * Use the command chown to set owner. 538 */ 539 @Override 540 public void setOwner(Path p, String username, String groupname) 541 throws IOException { 542 if (username == null && groupname == null) { 543 throw new IOException("username == null && groupname == null"); 544 } 545 546 if (username == null) { 547 execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 548 } else { 549 //OWNER[:[GROUP]] 550 String s = username + (groupname == null? "": ":" + groupname); 551 execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); 552 } 553 } 554 555 /** 556 * Use the command chmod to set permission. 557 */ 558 @Override 559 public void setPermission(Path p, FsPermission permission) 560 throws IOException { 561 if (NativeIO.isAvailable()) { 562 NativeIO.chmod(pathToFile(p).getCanonicalPath(), 563 permission.toShort()); 564 } else { 565 execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND, 566 String.format("%05o", permission.toShort())); 567 } 568 } 569 570 private static String execCommand(File f, String... cmd) throws IOException { 571 String[] args = new String[cmd.length + 1]; 572 System.arraycopy(cmd, 0, args, 0, cmd.length); 573 args[cmd.length] = FileUtil.makeShellPath(f, true); 574 String output = Shell.execCommand(args); 575 return output; 576 } 577 578}