001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.fs; 020 021 import java.io.*; 022 import java.nio.channels.ClosedChannelException; 023 import java.util.Arrays; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.fs.permission.FsPermission; 029 import org.apache.hadoop.util.Progressable; 030 import org.apache.hadoop.util.PureJavaCrc32; 031 032 /**************************************************************** 033 * Abstract Checksumed FileSystem. 034 * It provide a basic implementation of a Checksumed FileSystem, 035 * which creates a checksum file for each raw file. 036 * It generates & verifies checksums at the client side. 037 * 038 *****************************************************************/ 039 @InterfaceAudience.Public 040 @InterfaceStability.Stable 041 public abstract class ChecksumFileSystem extends FilterFileSystem { 042 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 043 private int bytesPerChecksum = 512; 044 private boolean verifyChecksum = true; 045 private boolean writeChecksum = true; 046 047 public static double getApproxChkSumLength(long size) { 048 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 049 } 050 051 public ChecksumFileSystem(FileSystem fs) { 052 super(fs); 053 } 054 055 @Override 056 public void setConf(Configuration conf) { 057 super.setConf(conf); 058 if (conf != null) { 059 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 060 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 061 } 062 } 063 064 /** 065 * Set whether to verify checksum. 066 */ 067 @Override 068 public void setVerifyChecksum(boolean verifyChecksum) { 069 this.verifyChecksum = verifyChecksum; 070 } 071 072 @Override 073 public void setWriteChecksum(boolean writeChecksum) { 074 this.writeChecksum = writeChecksum; 075 } 076 077 /** get the raw file system */ 078 @Override 079 public FileSystem getRawFileSystem() { 080 return fs; 081 } 082 083 /** Return the name of the checksum file associated with a file.*/ 084 public Path getChecksumFile(Path file) { 085 return new Path(file.getParent(), "." + file.getName() + ".crc"); 086 } 087 088 /** Return true iff file is a checksum file name.*/ 089 public static boolean isChecksumFile(Path file) { 090 String name = file.getName(); 091 return name.startsWith(".") && name.endsWith(".crc"); 092 } 093 094 /** Return the length of the checksum file given the size of the 095 * actual file. 096 **/ 097 public long getChecksumFileLength(Path file, long fileSize) { 098 return getChecksumLength(fileSize, getBytesPerSum()); 099 } 100 101 /** Return the bytes Per Checksum */ 102 public int getBytesPerSum() { 103 return bytesPerChecksum; 104 } 105 106 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 107 int defaultBufferSize = getConf().getInt( 108 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 109 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 110 int proportionalBufferSize = bufferSize / bytesPerSum; 111 return Math.max(bytesPerSum, 112 Math.max(proportionalBufferSize, defaultBufferSize)); 113 } 114 115 /******************************************************* 116 * For open()'s FSInputStream 117 * It verifies that data matches checksums. 118 *******************************************************/ 119 private static class ChecksumFSInputChecker extends FSInputChecker { 120 private ChecksumFileSystem fs; 121 private FSDataInputStream datas; 122 private FSDataInputStream sums; 123 124 private static final int HEADER_LENGTH = 8; 125 126 private int bytesPerSum = 1; 127 128 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 129 throws IOException { 130 this(fs, file, fs.getConf().getInt( 131 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 132 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 133 } 134 135 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 136 throws IOException { 137 super( file, fs.getFileStatus(file).getReplication() ); 138 this.datas = fs.getRawFileSystem().open(file, bufferSize); 139 this.fs = fs; 140 Path sumFile = fs.getChecksumFile(file); 141 try { 142 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 143 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 144 145 byte[] version = new byte[CHECKSUM_VERSION.length]; 146 sums.readFully(version); 147 if (!Arrays.equals(version, CHECKSUM_VERSION)) 148 throw new IOException("Not a checksum file: "+sumFile); 149 this.bytesPerSum = sums.readInt(); 150 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4); 151 } catch (FileNotFoundException e) { // quietly ignore 152 set(fs.verifyChecksum, null, 1, 0); 153 } catch (IOException e) { // loudly ignore 154 LOG.warn("Problem opening checksum file: "+ file + 155 ". Ignoring exception: " , e); 156 set(fs.verifyChecksum, null, 1, 0); 157 } 158 } 159 160 private long getChecksumFilePos( long dataPos ) { 161 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 162 } 163 164 @Override 165 protected long getChunkPosition( long dataPos ) { 166 return dataPos/bytesPerSum*bytesPerSum; 167 } 168 169 @Override 170 public int available() throws IOException { 171 return datas.available() + super.available(); 172 } 173 174 @Override 175 public int read(long position, byte[] b, int off, int len) 176 throws IOException { 177 // parameter check 178 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 179 throw new IndexOutOfBoundsException(); 180 } else if (len == 0) { 181 return 0; 182 } 183 if( position<0 ) { 184 throw new IllegalArgumentException( 185 "Parameter position can not to be negative"); 186 } 187 188 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 189 checker.seek(position); 190 int nread = checker.read(b, off, len); 191 checker.close(); 192 return nread; 193 } 194 195 @Override 196 public void close() throws IOException { 197 datas.close(); 198 if( sums != null ) { 199 sums.close(); 200 } 201 set(fs.verifyChecksum, null, 1, 0); 202 } 203 204 205 @Override 206 public boolean seekToNewSource(long targetPos) throws IOException { 207 long sumsPos = getChecksumFilePos(targetPos); 208 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 209 boolean newDataSource = datas.seekToNewSource(targetPos); 210 return sums.seekToNewSource(sumsPos) || newDataSource; 211 } 212 213 @Override 214 protected int readChunk(long pos, byte[] buf, int offset, int len, 215 byte[] checksum) throws IOException { 216 217 boolean eof = false; 218 if (needChecksum()) { 219 assert checksum != null; // we have a checksum buffer 220 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 221 assert len >= bytesPerSum; // we must read at least one chunk 222 223 final int checksumsToRead = Math.min( 224 len/bytesPerSum, // number of checksums based on len to read 225 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 226 long checksumPos = getChecksumFilePos(pos); 227 if(checksumPos != sums.getPos()) { 228 sums.seek(checksumPos); 229 } 230 231 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 232 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 233 throw new ChecksumException( 234 "Checksum file not a length multiple of checksum size " + 235 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 236 " sumLenread: " + sumLenRead, 237 pos); 238 } 239 if (sumLenRead <= 0) { // we're at the end of the file 240 eof = true; 241 } else { 242 // Adjust amount of data to read based on how many checksum chunks we read 243 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 244 } 245 } 246 if(pos != datas.getPos()) { 247 datas.seek(pos); 248 } 249 int nread = readFully(datas, buf, offset, len); 250 if (eof && nread > 0) { 251 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 252 } 253 return nread; 254 } 255 } 256 257 private static class FSDataBoundedInputStream extends FSDataInputStream { 258 private FileSystem fs; 259 private Path file; 260 private long fileLen = -1L; 261 262 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) 263 throws IOException { 264 super(in); 265 this.fs = fs; 266 this.file = file; 267 } 268 269 @Override 270 public boolean markSupported() { 271 return false; 272 } 273 274 /* Return the file length */ 275 private long getFileLength() throws IOException { 276 if( fileLen==-1L ) { 277 fileLen = fs.getContentSummary(file).getLength(); 278 } 279 return fileLen; 280 } 281 282 /** 283 * Skips over and discards <code>n</code> bytes of data from the 284 * input stream. 285 * 286 *The <code>skip</code> method skips over some smaller number of bytes 287 * when reaching end of file before <code>n</code> bytes have been skipped. 288 * The actual number of bytes skipped is returned. If <code>n</code> is 289 * negative, no bytes are skipped. 290 * 291 * @param n the number of bytes to be skipped. 292 * @return the actual number of bytes skipped. 293 * @exception IOException if an I/O error occurs. 294 * ChecksumException if the chunk to skip to is corrupted 295 */ 296 @Override 297 public synchronized long skip(long n) throws IOException { 298 long curPos = getPos(); 299 long fileLength = getFileLength(); 300 if( n+curPos > fileLength ) { 301 n = fileLength - curPos; 302 } 303 return super.skip(n); 304 } 305 306 /** 307 * Seek to the given position in the stream. 308 * The next read() will be from that position. 309 * 310 * <p>This method does not allow seek past the end of the file. 311 * This produces IOException. 312 * 313 * @param pos the postion to seek to. 314 * @exception IOException if an I/O error occurs or seeks after EOF 315 * ChecksumException if the chunk to seek to is corrupted 316 */ 317 318 @Override 319 public synchronized void seek(long pos) throws IOException { 320 if(pos>getFileLength()) { 321 throw new IOException("Cannot seek after EOF"); 322 } 323 super.seek(pos); 324 } 325 326 } 327 328 /** 329 * Opens an FSDataInputStream at the indicated Path. 330 * @param f the file name to open 331 * @param bufferSize the size of the buffer to be used. 332 */ 333 @Override 334 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 335 FileSystem fs; 336 InputStream in; 337 if (verifyChecksum) { 338 fs = this; 339 in = new ChecksumFSInputChecker(this, f, bufferSize); 340 } else { 341 fs = getRawFileSystem(); 342 in = fs.open(f, bufferSize); 343 } 344 return new FSDataBoundedInputStream(fs, f, in); 345 } 346 347 @Override 348 public FSDataOutputStream append(Path f, int bufferSize, 349 Progressable progress) throws IOException { 350 throw new IOException("Not supported"); 351 } 352 353 /** 354 * Calculated the length of the checksum file in bytes. 355 * @param size the length of the data file in bytes 356 * @param bytesPerSum the number of bytes in a checksum block 357 * @return the number of bytes in the checksum file 358 */ 359 public static long getChecksumLength(long size, int bytesPerSum) { 360 //the checksum length is equal to size passed divided by bytesPerSum + 361 //bytes written in the beginning of the checksum file. 362 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 363 CHECKSUM_VERSION.length + 4; 364 } 365 366 /** This class provides an output stream for a checksummed file. 367 * It generates checksums for data. */ 368 private static class ChecksumFSOutputSummer extends FSOutputSummer { 369 private FSDataOutputStream datas; 370 private FSDataOutputStream sums; 371 private static final float CHKSUM_AS_FRACTION = 0.01f; 372 private boolean isClosed = false; 373 374 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 375 Path file, 376 boolean overwrite, 377 int bufferSize, 378 short replication, 379 long blockSize, 380 Progressable progress) 381 throws IOException { 382 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4); 383 int bytesPerSum = fs.getBytesPerSum(); 384 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 385 replication, blockSize, progress); 386 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 387 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 388 sumBufferSize, replication, 389 blockSize); 390 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 391 sums.writeInt(bytesPerSum); 392 } 393 394 @Override 395 public void close() throws IOException { 396 try { 397 flushBuffer(); 398 sums.close(); 399 datas.close(); 400 } finally { 401 isClosed = true; 402 } 403 } 404 405 @Override 406 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 407 throws IOException { 408 datas.write(b, offset, len); 409 sums.write(checksum); 410 } 411 412 @Override 413 protected void checkClosed() throws IOException { 414 if (isClosed) { 415 throw new ClosedChannelException(); 416 } 417 } 418 } 419 420 @Override 421 public FSDataOutputStream create(Path f, FsPermission permission, 422 boolean overwrite, int bufferSize, short replication, long blockSize, 423 Progressable progress) throws IOException { 424 return create(f, permission, overwrite, true, bufferSize, 425 replication, blockSize, progress); 426 } 427 428 private FSDataOutputStream create(Path f, FsPermission permission, 429 boolean overwrite, boolean createParent, int bufferSize, 430 short replication, long blockSize, 431 Progressable progress) throws IOException { 432 Path parent = f.getParent(); 433 if (parent != null) { 434 if (!createParent && !exists(parent)) { 435 throw new FileNotFoundException("Parent directory doesn't exist: " 436 + parent); 437 } else if (!mkdirs(parent)) { 438 throw new IOException("Mkdirs failed to create " + parent); 439 } 440 } 441 final FSDataOutputStream out; 442 if (writeChecksum) { 443 out = new FSDataOutputStream( 444 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 445 blockSize, progress), null); 446 } else { 447 out = fs.create(f, permission, overwrite, bufferSize, replication, 448 blockSize, progress); 449 // remove the checksum file since we aren't writing one 450 Path checkFile = getChecksumFile(f); 451 if (fs.exists(checkFile)) { 452 fs.delete(checkFile, true); 453 } 454 } 455 if (permission != null) { 456 setPermission(f, permission); 457 } 458 return out; 459 } 460 461 @Override 462 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 463 boolean overwrite, int bufferSize, short replication, long blockSize, 464 Progressable progress) throws IOException { 465 return create(f, permission, overwrite, false, bufferSize, replication, 466 blockSize, progress); 467 } 468 469 /** 470 * Set replication for an existing file. 471 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 472 * @param src file name 473 * @param replication new replication 474 * @throws IOException 475 * @return true if successful; 476 * false if file does not exist or is a directory 477 */ 478 @Override 479 public boolean setReplication(Path src, short replication) throws IOException { 480 boolean value = fs.setReplication(src, replication); 481 if (!value) 482 return false; 483 484 Path checkFile = getChecksumFile(src); 485 if (exists(checkFile)) 486 fs.setReplication(checkFile, replication); 487 488 return true; 489 } 490 491 /** 492 * Rename files/dirs 493 */ 494 @Override 495 public boolean rename(Path src, Path dst) throws IOException { 496 if (fs.isDirectory(src)) { 497 return fs.rename(src, dst); 498 } else { 499 if (fs.isDirectory(dst)) { 500 dst = new Path(dst, src.getName()); 501 } 502 503 boolean value = fs.rename(src, dst); 504 if (!value) 505 return false; 506 507 Path srcCheckFile = getChecksumFile(src); 508 Path dstCheckFile = getChecksumFile(dst); 509 if (fs.exists(srcCheckFile)) { //try to rename checksum 510 value = fs.rename(srcCheckFile, dstCheckFile); 511 } else if (fs.exists(dstCheckFile)) { 512 // no src checksum, so remove dst checksum 513 value = fs.delete(dstCheckFile, true); 514 } 515 516 return value; 517 } 518 } 519 520 /** 521 * Implement the delete(Path, boolean) in checksum 522 * file system. 523 */ 524 @Override 525 public boolean delete(Path f, boolean recursive) throws IOException{ 526 FileStatus fstatus = null; 527 try { 528 fstatus = fs.getFileStatus(f); 529 } catch(FileNotFoundException e) { 530 return false; 531 } 532 if (fstatus.isDirectory()) { 533 //this works since the crcs are in the same 534 //directories and the files. so we just delete 535 //everything in the underlying filesystem 536 return fs.delete(f, recursive); 537 } else { 538 Path checkFile = getChecksumFile(f); 539 if (fs.exists(checkFile)) { 540 fs.delete(checkFile, true); 541 } 542 return fs.delete(f, true); 543 } 544 } 545 546 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 547 @Override 548 public boolean accept(Path file) { 549 return !isChecksumFile(file); 550 } 551 }; 552 553 /** 554 * List the statuses of the files/directories in the given path if the path is 555 * a directory. 556 * 557 * @param f 558 * given path 559 * @return the statuses of the files/directories in the given patch 560 * @throws IOException 561 */ 562 @Override 563 public FileStatus[] listStatus(Path f) throws IOException { 564 return fs.listStatus(f, DEFAULT_FILTER); 565 } 566 567 /** 568 * List the statuses of the files/directories in the given path if the path is 569 * a directory. 570 * 571 * @param f 572 * given path 573 * @return the statuses of the files/directories in the given patch 574 * @throws IOException 575 */ 576 @Override 577 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 578 throws IOException { 579 return fs.listLocatedStatus(f, DEFAULT_FILTER); 580 } 581 582 @Override 583 public boolean mkdirs(Path f) throws IOException { 584 return fs.mkdirs(f); 585 } 586 587 @Override 588 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 589 throws IOException { 590 Configuration conf = getConf(); 591 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 592 } 593 594 /** 595 * The src file is under FS, and the dst is on the local disk. 596 * Copy it from FS control to the local dst name. 597 */ 598 @Override 599 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 600 throws IOException { 601 Configuration conf = getConf(); 602 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 603 } 604 605 /** 606 * The src file is under FS, and the dst is on the local disk. 607 * Copy it from FS control to the local dst name. 608 * If src and dst are directories, the copyCrc parameter 609 * determines whether to copy CRC files. 610 */ 611 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 612 throws IOException { 613 if (!fs.isDirectory(src)) { // source is a file 614 fs.copyToLocalFile(src, dst); 615 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 616 if (localFs.isDirectory(dst)) { 617 dst = new Path(dst, src.getName()); 618 } 619 dst = getChecksumFile(dst); 620 if (localFs.exists(dst)) { //remove old local checksum file 621 localFs.delete(dst, true); 622 } 623 Path checksumFile = getChecksumFile(src); 624 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 625 fs.copyToLocalFile(checksumFile, dst); 626 } 627 } else { 628 FileStatus[] srcs = listStatus(src); 629 for (FileStatus srcFile : srcs) { 630 copyToLocalFile(srcFile.getPath(), 631 new Path(dst, srcFile.getPath().getName()), copyCrc); 632 } 633 } 634 } 635 636 @Override 637 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 638 throws IOException { 639 return tmpLocalFile; 640 } 641 642 @Override 643 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 644 throws IOException { 645 moveFromLocalFile(tmpLocalFile, fsOutputFile); 646 } 647 648 /** 649 * Report a checksum error to the file system. 650 * @param f the file name containing the error 651 * @param in the stream open on the file 652 * @param inPos the position of the beginning of the bad data in the file 653 * @param sums the stream open on the checksum file 654 * @param sumsPos the position of the beginning of the bad data in the checksum file 655 * @return if retry is neccessary 656 */ 657 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 658 long inPos, FSDataInputStream sums, long sumsPos) { 659 return false; 660 } 661 }