001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.*; 022import java.util.Arrays; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.permission.FsPermission; 030import org.apache.hadoop.util.Progressable; 031import org.apache.hadoop.util.PureJavaCrc32; 032 033/**************************************************************** 034 * Abstract Checksumed FileSystem. 035 * It provide a basice implementation of a Checksumed FileSystem, 036 * which creates a checksum file for each raw file. 037 * It generates & verifies checksums at the client side. 038 * 039 *****************************************************************/ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public abstract class ChecksumFileSystem extends FilterFileSystem { 043 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 044 private int bytesPerChecksum = 512; 045 private boolean verifyChecksum = true; 046 private boolean writeChecksum = true; 047 048 public static double getApproxChkSumLength(long size) { 049 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 050 } 051 052 public ChecksumFileSystem(FileSystem fs) { 053 super(fs); 054 } 055 056 public void setConf(Configuration conf) { 057 super.setConf(conf); 058 if (conf != null) { 059 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 060 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 061 } 062 } 063 064 /** 065 * Set whether to verify checksum. 066 */ 067 public void setVerifyChecksum(boolean verifyChecksum) { 068 this.verifyChecksum = verifyChecksum; 069 } 070 071 @Override 072 public void setWriteChecksum(boolean writeChecksum) { 073 this.writeChecksum = writeChecksum; 074 } 075 076 /** get the raw file system */ 077 public FileSystem getRawFileSystem() { 078 return fs; 079 } 080 081 /** Return the name of the checksum file associated with a file.*/ 082 public Path getChecksumFile(Path file) { 083 return new Path(file.getParent(), "." + file.getName() + ".crc"); 084 } 085 086 /** Return true iff file is a checksum file name.*/ 087 public static boolean isChecksumFile(Path file) { 088 String name = file.getName(); 089 return name.startsWith(".") && name.endsWith(".crc"); 090 } 091 092 /** Return the length of the checksum file given the size of the 093 * actual file. 094 **/ 095 public long getChecksumFileLength(Path file, long fileSize) { 096 return getChecksumLength(fileSize, getBytesPerSum()); 097 } 098 099 /** Return the bytes Per Checksum */ 100 public int getBytesPerSum() { 101 return bytesPerChecksum; 102 } 103 104 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 105 int defaultBufferSize = getConf().getInt( 106 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 107 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 108 int proportionalBufferSize = bufferSize / bytesPerSum; 109 return Math.max(bytesPerSum, 110 Math.max(proportionalBufferSize, defaultBufferSize)); 111 } 112 113 /******************************************************* 114 * For open()'s FSInputStream 115 * It verifies that data matches checksums. 116 *******************************************************/ 117 private static class ChecksumFSInputChecker extends FSInputChecker { 118 public static final Log LOG 119 = LogFactory.getLog(FSInputChecker.class); 120 121 private ChecksumFileSystem fs; 122 private FSDataInputStream datas; 123 private FSDataInputStream sums; 124 125 private static final int HEADER_LENGTH = 8; 126 127 private int bytesPerSum = 1; 128 129 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 130 throws IOException { 131 this(fs, file, fs.getConf().getInt( 132 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 133 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 134 } 135 136 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 137 throws IOException { 138 super( file, fs.getFileStatus(file).getReplication() ); 139 this.datas = fs.getRawFileSystem().open(file, bufferSize); 140 this.fs = fs; 141 Path sumFile = fs.getChecksumFile(file); 142 try { 143 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 144 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 145 146 byte[] version = new byte[CHECKSUM_VERSION.length]; 147 sums.readFully(version); 148 if (!Arrays.equals(version, CHECKSUM_VERSION)) 149 throw new IOException("Not a checksum file: "+sumFile); 150 this.bytesPerSum = sums.readInt(); 151 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4); 152 } catch (FileNotFoundException e) { // quietly ignore 153 set(fs.verifyChecksum, null, 1, 0); 154 } catch (IOException e) { // loudly ignore 155 LOG.warn("Problem opening checksum file: "+ file + 156 ". Ignoring exception: " , e); 157 set(fs.verifyChecksum, null, 1, 0); 158 } 159 } 160 161 private long getChecksumFilePos( long dataPos ) { 162 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 163 } 164 165 protected long getChunkPosition( long dataPos ) { 166 return dataPos/bytesPerSum*bytesPerSum; 167 } 168 169 public int available() throws IOException { 170 return datas.available() + super.available(); 171 } 172 173 public int read(long position, byte[] b, int off, int len) 174 throws IOException { 175 // parameter check 176 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 177 throw new IndexOutOfBoundsException(); 178 } else if (len == 0) { 179 return 0; 180 } 181 if( position<0 ) { 182 throw new IllegalArgumentException( 183 "Parameter position can not to be negative"); 184 } 185 186 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 187 checker.seek(position); 188 int nread = checker.read(b, off, len); 189 checker.close(); 190 return nread; 191 } 192 193 public void close() throws IOException { 194 datas.close(); 195 if( sums != null ) { 196 sums.close(); 197 } 198 set(fs.verifyChecksum, null, 1, 0); 199 } 200 201 202 @Override 203 public boolean seekToNewSource(long targetPos) throws IOException { 204 long sumsPos = getChecksumFilePos(targetPos); 205 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 206 boolean newDataSource = datas.seekToNewSource(targetPos); 207 return sums.seekToNewSource(sumsPos) || newDataSource; 208 } 209 210 @Override 211 protected int readChunk(long pos, byte[] buf, int offset, int len, 212 byte[] checksum) throws IOException { 213 214 boolean eof = false; 215 if (needChecksum()) { 216 assert checksum != null; // we have a checksum buffer 217 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 218 assert len >= bytesPerSum; // we must read at least one chunk 219 220 final int checksumsToRead = Math.min( 221 len/bytesPerSum, // number of checksums based on len to read 222 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 223 long checksumPos = getChecksumFilePos(pos); 224 if(checksumPos != sums.getPos()) { 225 sums.seek(checksumPos); 226 } 227 228 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 229 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 230 throw new ChecksumException( 231 "Checksum file not a length multiple of checksum size " + 232 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 233 " sumLenread: " + sumLenRead, 234 pos); 235 } 236 if (sumLenRead <= 0) { // we're at the end of the file 237 eof = true; 238 } else { 239 // Adjust amount of data to read based on how many checksum chunks we read 240 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 241 } 242 } 243 if(pos != datas.getPos()) { 244 datas.seek(pos); 245 } 246 int nread = readFully(datas, buf, offset, len); 247 if (eof && nread > 0) { 248 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 249 } 250 return nread; 251 } 252 } 253 254 private static class FSDataBoundedInputStream extends FSDataInputStream { 255 private FileSystem fs; 256 private Path file; 257 private long fileLen = -1L; 258 259 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) 260 throws IOException { 261 super(in); 262 this.fs = fs; 263 this.file = file; 264 } 265 266 @Override 267 public boolean markSupported() { 268 return false; 269 } 270 271 /* Return the file length */ 272 private long getFileLength() throws IOException { 273 if( fileLen==-1L ) { 274 fileLen = fs.getContentSummary(file).getLength(); 275 } 276 return fileLen; 277 } 278 279 /** 280 * Skips over and discards <code>n</code> bytes of data from the 281 * input stream. 282 * 283 *The <code>skip</code> method skips over some smaller number of bytes 284 * when reaching end of file before <code>n</code> bytes have been skipped. 285 * The actual number of bytes skipped is returned. If <code>n</code> is 286 * negative, no bytes are skipped. 287 * 288 * @param n the number of bytes to be skipped. 289 * @return the actual number of bytes skipped. 290 * @exception IOException if an I/O error occurs. 291 * ChecksumException if the chunk to skip to is corrupted 292 */ 293 public synchronized long skip(long n) throws IOException { 294 long curPos = getPos(); 295 long fileLength = getFileLength(); 296 if( n+curPos > fileLength ) { 297 n = fileLength - curPos; 298 } 299 return super.skip(n); 300 } 301 302 /** 303 * Seek to the given position in the stream. 304 * The next read() will be from that position. 305 * 306 * <p>This method does not allow seek past the end of the file. 307 * This produces IOException. 308 * 309 * @param pos the postion to seek to. 310 * @exception IOException if an I/O error occurs or seeks after EOF 311 * ChecksumException if the chunk to seek to is corrupted 312 */ 313 314 public synchronized void seek(long pos) throws IOException { 315 if(pos>getFileLength()) { 316 throw new IOException("Cannot seek after EOF"); 317 } 318 super.seek(pos); 319 } 320 321 } 322 323 /** 324 * Opens an FSDataInputStream at the indicated Path. 325 * @param f the file name to open 326 * @param bufferSize the size of the buffer to be used. 327 */ 328 @Override 329 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 330 FileSystem fs; 331 InputStream in; 332 if (verifyChecksum) { 333 fs = this; 334 in = new ChecksumFSInputChecker(this, f, bufferSize); 335 } else { 336 fs = getRawFileSystem(); 337 in = fs.open(f, bufferSize); 338 } 339 return new FSDataBoundedInputStream(fs, f, in); 340 } 341 342 /** {@inheritDoc} */ 343 public FSDataOutputStream append(Path f, int bufferSize, 344 Progressable progress) throws IOException { 345 throw new IOException("Not supported"); 346 } 347 348 /** 349 * Calculated the length of the checksum file in bytes. 350 * @param size the length of the data file in bytes 351 * @param bytesPerSum the number of bytes in a checksum block 352 * @return the number of bytes in the checksum file 353 */ 354 public static long getChecksumLength(long size, int bytesPerSum) { 355 //the checksum length is equal to size passed divided by bytesPerSum + 356 //bytes written in the beginning of the checksum file. 357 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 358 CHECKSUM_VERSION.length + 4; 359 } 360 361 /** This class provides an output stream for a checksummed file. 362 * It generates checksums for data. */ 363 private static class ChecksumFSOutputSummer extends FSOutputSummer { 364 private FSDataOutputStream datas; 365 private FSDataOutputStream sums; 366 private static final float CHKSUM_AS_FRACTION = 0.01f; 367 368 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 369 Path file, 370 boolean overwrite, 371 short replication, 372 long blockSize, 373 Configuration conf) 374 throws IOException { 375 this(fs, file, overwrite, 376 conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 377 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT), 378 replication, blockSize, null); 379 } 380 381 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 382 Path file, 383 boolean overwrite, 384 int bufferSize, 385 short replication, 386 long blockSize, 387 Progressable progress) 388 throws IOException { 389 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4); 390 int bytesPerSum = fs.getBytesPerSum(); 391 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 392 replication, blockSize, progress); 393 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 394 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 395 sumBufferSize, replication, 396 blockSize); 397 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 398 sums.writeInt(bytesPerSum); 399 } 400 401 public void close() throws IOException { 402 flushBuffer(); 403 sums.close(); 404 datas.close(); 405 } 406 407 @Override 408 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum) 409 throws IOException { 410 datas.write(b, offset, len); 411 sums.write(checksum); 412 } 413 } 414 415 /** {@inheritDoc} */ 416 @Override 417 public FSDataOutputStream create(Path f, FsPermission permission, 418 boolean overwrite, int bufferSize, short replication, long blockSize, 419 Progressable progress) throws IOException { 420 return create(f, permission, overwrite, true, bufferSize, 421 replication, blockSize, progress); 422 } 423 424 private FSDataOutputStream create(Path f, FsPermission permission, 425 boolean overwrite, boolean createParent, int bufferSize, 426 short replication, long blockSize, 427 Progressable progress) throws IOException { 428 Path parent = f.getParent(); 429 if (parent != null) { 430 if (!createParent && !exists(parent)) { 431 throw new FileNotFoundException("Parent directory doesn't exist: " 432 + parent); 433 } else if (!mkdirs(parent)) { 434 throw new IOException("Mkdirs failed to create " + parent); 435 } 436 } 437 final FSDataOutputStream out; 438 if (writeChecksum) { 439 out = new FSDataOutputStream( 440 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 441 blockSize, progress), null); 442 } else { 443 out = fs.create(f, permission, overwrite, bufferSize, replication, 444 blockSize, progress); 445 // remove the checksum file since we aren't writing one 446 Path checkFile = getChecksumFile(f); 447 if (fs.exists(checkFile)) { 448 fs.delete(checkFile, true); 449 } 450 } 451 if (permission != null) { 452 setPermission(f, permission); 453 } 454 return out; 455 } 456 457 /** {@inheritDoc} */ 458 @Override 459 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 460 boolean overwrite, int bufferSize, short replication, long blockSize, 461 Progressable progress) throws IOException { 462 return create(f, permission, overwrite, false, bufferSize, replication, 463 blockSize, progress); 464 } 465 466 /** 467 * Set replication for an existing file. 468 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 469 * @param src file name 470 * @param replication new replication 471 * @throws IOException 472 * @return true if successful; 473 * false if file does not exist or is a directory 474 */ 475 public boolean setReplication(Path src, short replication) throws IOException { 476 boolean value = fs.setReplication(src, replication); 477 if (!value) 478 return false; 479 480 Path checkFile = getChecksumFile(src); 481 if (exists(checkFile)) 482 fs.setReplication(checkFile, replication); 483 484 return true; 485 } 486 487 /** 488 * Rename files/dirs 489 */ 490 public boolean rename(Path src, Path dst) throws IOException { 491 if (fs.isDirectory(src)) { 492 return fs.rename(src, dst); 493 } else { 494 if (fs.isDirectory(dst)) { 495 dst = new Path(dst, src.getName()); 496 } 497 498 boolean value = fs.rename(src, dst); 499 if (!value) 500 return false; 501 502 Path srcCheckFile = getChecksumFile(src); 503 Path dstCheckFile = getChecksumFile(dst); 504 if (fs.exists(srcCheckFile)) { //try to rename checksum 505 value = fs.rename(srcCheckFile, dstCheckFile); 506 } else if (fs.exists(dstCheckFile)) { 507 // no src checksum, so remove dst checksum 508 value = fs.delete(dstCheckFile, true); 509 } 510 511 return value; 512 } 513 } 514 515 /** 516 * Implement the delete(Path, boolean) in checksum 517 * file system. 518 */ 519 public boolean delete(Path f, boolean recursive) throws IOException{ 520 FileStatus fstatus = null; 521 try { 522 fstatus = fs.getFileStatus(f); 523 } catch(FileNotFoundException e) { 524 return false; 525 } 526 if (fstatus.isDirectory()) { 527 //this works since the crcs are in the same 528 //directories and the files. so we just delete 529 //everything in the underlying filesystem 530 return fs.delete(f, recursive); 531 } else { 532 Path checkFile = getChecksumFile(f); 533 if (fs.exists(checkFile)) { 534 fs.delete(checkFile, true); 535 } 536 return fs.delete(f, true); 537 } 538 } 539 540 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 541 public boolean accept(Path file) { 542 return !isChecksumFile(file); 543 } 544 }; 545 546 /** 547 * List the statuses of the files/directories in the given path if the path is 548 * a directory. 549 * 550 * @param f 551 * given path 552 * @return the statuses of the files/directories in the given patch 553 * @throws IOException 554 */ 555 @Override 556 public FileStatus[] listStatus(Path f) throws IOException { 557 return fs.listStatus(f, DEFAULT_FILTER); 558 } 559 560 /** 561 * List the statuses of the files/directories in the given path if the path is 562 * a directory. 563 * 564 * @param f 565 * given path 566 * @return the statuses of the files/directories in the given patch 567 * @throws IOException 568 */ 569 @Override 570 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 571 throws IOException { 572 return fs.listLocatedStatus(f, DEFAULT_FILTER); 573 } 574 575 @Override 576 public boolean mkdirs(Path f) throws IOException { 577 return fs.mkdirs(f); 578 } 579 580 @Override 581 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 582 throws IOException { 583 Configuration conf = getConf(); 584 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 585 } 586 587 /** 588 * The src file is under FS, and the dst is on the local disk. 589 * Copy it from FS control to the local dst name. 590 */ 591 @Override 592 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 593 throws IOException { 594 Configuration conf = getConf(); 595 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 596 } 597 598 /** 599 * The src file is under FS, and the dst is on the local disk. 600 * Copy it from FS control to the local dst name. 601 * If src and dst are directories, the copyCrc parameter 602 * determines whether to copy CRC files. 603 */ 604 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 605 throws IOException { 606 if (!fs.isDirectory(src)) { // source is a file 607 fs.copyToLocalFile(src, dst); 608 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 609 if (localFs.isDirectory(dst)) { 610 dst = new Path(dst, src.getName()); 611 } 612 dst = getChecksumFile(dst); 613 if (localFs.exists(dst)) { //remove old local checksum file 614 localFs.delete(dst, true); 615 } 616 Path checksumFile = getChecksumFile(src); 617 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 618 fs.copyToLocalFile(checksumFile, dst); 619 } 620 } else { 621 FileStatus[] srcs = listStatus(src); 622 for (FileStatus srcFile : srcs) { 623 copyToLocalFile(srcFile.getPath(), 624 new Path(dst, srcFile.getPath().getName()), copyCrc); 625 } 626 } 627 } 628 629 @Override 630 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 631 throws IOException { 632 return tmpLocalFile; 633 } 634 635 @Override 636 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 637 throws IOException { 638 moveFromLocalFile(tmpLocalFile, fsOutputFile); 639 } 640 641 /** 642 * Report a checksum error to the file system. 643 * @param f the file name containing the error 644 * @param in the stream open on the file 645 * @param inPos the position of the beginning of the bad data in the file 646 * @param sums the stream open on the checksum file 647 * @param sumsPos the position of the beginning of the bad data in the checksum file 648 * @return if retry is neccessary 649 */ 650 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 651 long inPos, FSDataInputStream sums, long sumsPos) { 652 return false; 653 } 654}