001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027import java.util.List; 028 029import com.google.common.base.Preconditions; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.permission.AclEntry; 034import org.apache.hadoop.fs.permission.FsPermission; 035import org.apache.hadoop.util.DataChecksum; 036import org.apache.hadoop.util.Progressable; 037 038/**************************************************************** 039 * Abstract Checksumed FileSystem. 040 * It provide a basic implementation of a Checksumed FileSystem, 041 * which creates a checksum file for each raw file. 042 * It generates & verifies checksums at the client side. 043 * 044 *****************************************************************/ 045@InterfaceAudience.Public 046@InterfaceStability.Stable 047public abstract class ChecksumFileSystem extends FilterFileSystem { 048 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 049 private int bytesPerChecksum = 512; 050 private boolean verifyChecksum = true; 051 private boolean writeChecksum = true; 052 053 public static double getApproxChkSumLength(long size) { 054 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 055 } 056 057 public ChecksumFileSystem(FileSystem fs) { 058 super(fs); 059 } 060 061 @Override 062 public void setConf(Configuration conf) { 063 super.setConf(conf); 064 if (conf != null) { 065 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 066 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 067 Preconditions.checkState(bytesPerChecksum > 0, 068 "bytes per checksum should be positive but was %s", 069 bytesPerChecksum); 070 } 071 } 072 073 /** 074 * Set whether to verify checksum. 075 */ 076 @Override 077 public void setVerifyChecksum(boolean verifyChecksum) { 078 this.verifyChecksum = verifyChecksum; 079 } 080 081 @Override 082 public void setWriteChecksum(boolean writeChecksum) { 083 this.writeChecksum = writeChecksum; 084 } 085 086 /** get the raw file system */ 087 @Override 088 public FileSystem getRawFileSystem() { 089 return fs; 090 } 091 092 /** Return the name of the checksum file associated with a file.*/ 093 public Path getChecksumFile(Path file) { 094 return new Path(file.getParent(), "." + file.getName() + ".crc"); 095 } 096 097 /** Return true iff file is a checksum file name.*/ 098 public static boolean isChecksumFile(Path file) { 099 String name = file.getName(); 100 return name.startsWith(".") && name.endsWith(".crc"); 101 } 102 103 /** Return the length of the checksum file given the size of the 104 * actual file. 105 **/ 106 public long getChecksumFileLength(Path file, long fileSize) { 107 return getChecksumLength(fileSize, getBytesPerSum()); 108 } 109 110 /** Return the bytes Per Checksum */ 111 public int getBytesPerSum() { 112 return bytesPerChecksum; 113 } 114 115 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 116 int defaultBufferSize = getConf().getInt( 117 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 118 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 119 int proportionalBufferSize = bufferSize / bytesPerSum; 120 return Math.max(bytesPerSum, 121 Math.max(proportionalBufferSize, defaultBufferSize)); 122 } 123 124 /******************************************************* 125 * For open()'s FSInputStream 126 * It verifies that data matches checksums. 127 *******************************************************/ 128 private static class ChecksumFSInputChecker extends FSInputChecker { 129 private ChecksumFileSystem fs; 130 private FSDataInputStream datas; 131 private FSDataInputStream sums; 132 133 private static final int HEADER_LENGTH = 8; 134 135 private int bytesPerSum = 1; 136 137 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 138 throws IOException { 139 this(fs, file, fs.getConf().getInt( 140 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 141 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 142 } 143 144 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 145 throws IOException { 146 super( file, fs.getFileStatus(file).getReplication() ); 147 this.datas = fs.getRawFileSystem().open(file, bufferSize); 148 this.fs = fs; 149 Path sumFile = fs.getChecksumFile(file); 150 try { 151 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 152 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 153 154 byte[] version = new byte[CHECKSUM_VERSION.length]; 155 sums.readFully(version); 156 if (!Arrays.equals(version, CHECKSUM_VERSION)) 157 throw new IOException("Not a checksum file: "+sumFile); 158 this.bytesPerSum = sums.readInt(); 159 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 160 } catch (IOException e) { 161 // mincing the message is terrible, but java throws permission 162 // exceptions as FNF because that's all the method signatures allow! 163 if (!(e instanceof FileNotFoundException) || 164 e.getMessage().endsWith(" (Permission denied)")) { 165 LOG.warn("Problem opening checksum file: "+ file + 166 ". Ignoring exception: " , e); 167 } 168 set(fs.verifyChecksum, null, 1, 0); 169 } 170 } 171 172 private long getChecksumFilePos( long dataPos ) { 173 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 174 } 175 176 @Override 177 protected long getChunkPosition( long dataPos ) { 178 return dataPos/bytesPerSum*bytesPerSum; 179 } 180 181 @Override 182 public int available() throws IOException { 183 return datas.available() + super.available(); 184 } 185 186 @Override 187 public int read(long position, byte[] b, int off, int len) 188 throws IOException { 189 // parameter check 190 validatePositionedReadArgs(position, b, off, len); 191 if (len == 0) { 192 return 0; 193 } 194 195 int nread; 196 try (ChecksumFSInputChecker checker = 197 new ChecksumFSInputChecker(fs, file)) { 198 checker.seek(position); 199 nread = checker.read(b, off, len); 200 } 201 return nread; 202 } 203 204 @Override 205 public void close() throws IOException { 206 datas.close(); 207 if( sums != null ) { 208 sums.close(); 209 } 210 set(fs.verifyChecksum, null, 1, 0); 211 } 212 213 214 @Override 215 public boolean seekToNewSource(long targetPos) throws IOException { 216 long sumsPos = getChecksumFilePos(targetPos); 217 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 218 boolean newDataSource = datas.seekToNewSource(targetPos); 219 return sums.seekToNewSource(sumsPos) || newDataSource; 220 } 221 222 @Override 223 protected int readChunk(long pos, byte[] buf, int offset, int len, 224 byte[] checksum) throws IOException { 225 226 boolean eof = false; 227 if (needChecksum()) { 228 assert checksum != null; // we have a checksum buffer 229 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 230 assert len >= bytesPerSum; // we must read at least one chunk 231 232 final int checksumsToRead = Math.min( 233 len/bytesPerSum, // number of checksums based on len to read 234 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 235 long checksumPos = getChecksumFilePos(pos); 236 if(checksumPos != sums.getPos()) { 237 sums.seek(checksumPos); 238 } 239 240 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 241 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 242 throw new ChecksumException( 243 "Checksum file not a length multiple of checksum size " + 244 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 245 " sumLenread: " + sumLenRead, 246 pos); 247 } 248 if (sumLenRead <= 0) { // we're at the end of the file 249 eof = true; 250 } else { 251 // Adjust amount of data to read based on how many checksum chunks we read 252 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 253 } 254 } 255 if(pos != datas.getPos()) { 256 datas.seek(pos); 257 } 258 int nread = readFully(datas, buf, offset, len); 259 if (eof && nread > 0) { 260 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 261 } 262 return nread; 263 } 264 } 265 266 private static class FSDataBoundedInputStream extends FSDataInputStream { 267 private FileSystem fs; 268 private Path file; 269 private long fileLen = -1L; 270 271 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 272 super(in); 273 this.fs = fs; 274 this.file = file; 275 } 276 277 @Override 278 public boolean markSupported() { 279 return false; 280 } 281 282 /* Return the file length */ 283 private long getFileLength() throws IOException { 284 if( fileLen==-1L ) { 285 fileLen = fs.getContentSummary(file).getLength(); 286 } 287 return fileLen; 288 } 289 290 /** 291 * Skips over and discards <code>n</code> bytes of data from the 292 * input stream. 293 * 294 *The <code>skip</code> method skips over some smaller number of bytes 295 * when reaching end of file before <code>n</code> bytes have been skipped. 296 * The actual number of bytes skipped is returned. If <code>n</code> is 297 * negative, no bytes are skipped. 298 * 299 * @param n the number of bytes to be skipped. 300 * @return the actual number of bytes skipped. 301 * @exception IOException if an I/O error occurs. 302 * ChecksumException if the chunk to skip to is corrupted 303 */ 304 @Override 305 public synchronized long skip(long n) throws IOException { 306 long curPos = getPos(); 307 long fileLength = getFileLength(); 308 if( n+curPos > fileLength ) { 309 n = fileLength - curPos; 310 } 311 return super.skip(n); 312 } 313 314 /** 315 * Seek to the given position in the stream. 316 * The next read() will be from that position. 317 * 318 * <p>This method does not allow seek past the end of the file. 319 * This produces IOException. 320 * 321 * @param pos the postion to seek to. 322 * @exception IOException if an I/O error occurs or seeks after EOF 323 * ChecksumException if the chunk to seek to is corrupted 324 */ 325 326 @Override 327 public synchronized void seek(long pos) throws IOException { 328 if (pos > getFileLength()) { 329 throw new EOFException("Cannot seek after EOF"); 330 } 331 super.seek(pos); 332 } 333 334 } 335 336 /** 337 * Opens an FSDataInputStream at the indicated Path. 338 * @param f the file name to open 339 * @param bufferSize the size of the buffer to be used. 340 */ 341 @Override 342 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 343 FileSystem fs; 344 InputStream in; 345 if (verifyChecksum) { 346 fs = this; 347 in = new ChecksumFSInputChecker(this, f, bufferSize); 348 } else { 349 fs = getRawFileSystem(); 350 in = fs.open(f, bufferSize); 351 } 352 return new FSDataBoundedInputStream(fs, f, in); 353 } 354 355 @Override 356 public FSDataOutputStream append(Path f, int bufferSize, 357 Progressable progress) throws IOException { 358 throw new IOException("Not supported"); 359 } 360 361 @Override 362 public boolean truncate(Path f, long newLength) throws IOException { 363 throw new IOException("Not supported"); 364 } 365 366 /** 367 * Calculated the length of the checksum file in bytes. 368 * @param size the length of the data file in bytes 369 * @param bytesPerSum the number of bytes in a checksum block 370 * @return the number of bytes in the checksum file 371 */ 372 public static long getChecksumLength(long size, int bytesPerSum) { 373 //the checksum length is equal to size passed divided by bytesPerSum + 374 //bytes written in the beginning of the checksum file. 375 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 376 CHECKSUM_VERSION.length + 4; 377 } 378 379 /** This class provides an output stream for a checksummed file. 380 * It generates checksums for data. */ 381 private static class ChecksumFSOutputSummer extends FSOutputSummer { 382 private FSDataOutputStream datas; 383 private FSDataOutputStream sums; 384 private static final float CHKSUM_AS_FRACTION = 0.01f; 385 private boolean isClosed = false; 386 387 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 388 Path file, 389 boolean overwrite, 390 int bufferSize, 391 short replication, 392 long blockSize, 393 Progressable progress, 394 FsPermission permission) 395 throws IOException { 396 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 397 fs.getBytesPerSum())); 398 int bytesPerSum = fs.getBytesPerSum(); 399 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 400 bufferSize, replication, blockSize, 401 progress); 402 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 403 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 404 permission, true, sumBufferSize, 405 replication, blockSize, null); 406 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 407 sums.writeInt(bytesPerSum); 408 } 409 410 @Override 411 public void close() throws IOException { 412 try { 413 flushBuffer(); 414 sums.close(); 415 datas.close(); 416 } finally { 417 isClosed = true; 418 } 419 } 420 421 @Override 422 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 423 int ckoff, int cklen) 424 throws IOException { 425 datas.write(b, offset, len); 426 sums.write(checksum, ckoff, cklen); 427 } 428 429 @Override 430 protected void checkClosed() throws IOException { 431 if (isClosed) { 432 throw new ClosedChannelException(); 433 } 434 } 435 } 436 437 @Override 438 public FSDataOutputStream create(Path f, FsPermission permission, 439 boolean overwrite, int bufferSize, short replication, long blockSize, 440 Progressable progress) throws IOException { 441 return create(f, permission, overwrite, true, bufferSize, 442 replication, blockSize, progress); 443 } 444 445 private FSDataOutputStream create(Path f, FsPermission permission, 446 boolean overwrite, boolean createParent, int bufferSize, 447 short replication, long blockSize, 448 Progressable progress) throws IOException { 449 Path parent = f.getParent(); 450 if (parent != null) { 451 if (!createParent && !exists(parent)) { 452 throw new FileNotFoundException("Parent directory doesn't exist: " 453 + parent); 454 } else if (!mkdirs(parent)) { 455 throw new IOException("Mkdirs failed to create " + parent 456 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 457 + ")"); 458 } 459 } 460 final FSDataOutputStream out; 461 if (writeChecksum) { 462 out = new FSDataOutputStream( 463 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 464 blockSize, progress, permission), null); 465 } else { 466 out = fs.create(f, permission, overwrite, bufferSize, replication, 467 blockSize, progress); 468 // remove the checksum file since we aren't writing one 469 Path checkFile = getChecksumFile(f); 470 if (fs.exists(checkFile)) { 471 fs.delete(checkFile, true); 472 } 473 } 474 return out; 475 } 476 477 @Override 478 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 479 boolean overwrite, int bufferSize, short replication, long blockSize, 480 Progressable progress) throws IOException { 481 return create(f, permission, overwrite, false, bufferSize, replication, 482 blockSize, progress); 483 } 484 485 abstract class FsOperation { 486 boolean run(Path p) throws IOException { 487 boolean status = apply(p); 488 if (status) { 489 Path checkFile = getChecksumFile(p); 490 if (fs.exists(checkFile)) { 491 apply(checkFile); 492 } 493 } 494 return status; 495 } 496 abstract boolean apply(Path p) throws IOException; 497 } 498 499 500 @Override 501 public void setPermission(Path src, final FsPermission permission) 502 throws IOException { 503 new FsOperation(){ 504 @Override 505 boolean apply(Path p) throws IOException { 506 fs.setPermission(p, permission); 507 return true; 508 } 509 }.run(src); 510 } 511 512 @Override 513 public void setOwner(Path src, final String username, final String groupname) 514 throws IOException { 515 new FsOperation(){ 516 @Override 517 boolean apply(Path p) throws IOException { 518 fs.setOwner(p, username, groupname); 519 return true; 520 } 521 }.run(src); 522 } 523 524 @Override 525 public void setAcl(Path src, final List<AclEntry> aclSpec) 526 throws IOException { 527 new FsOperation(){ 528 @Override 529 boolean apply(Path p) throws IOException { 530 fs.setAcl(p, aclSpec); 531 return true; 532 } 533 }.run(src); 534 } 535 536 @Override 537 public void modifyAclEntries(Path src, final List<AclEntry> aclSpec) 538 throws IOException { 539 new FsOperation(){ 540 @Override 541 boolean apply(Path p) throws IOException { 542 fs.modifyAclEntries(p, aclSpec); 543 return true; 544 } 545 }.run(src); 546 } 547 548 @Override 549 public void removeAcl(Path src) throws IOException { 550 new FsOperation(){ 551 @Override 552 boolean apply(Path p) throws IOException { 553 fs.removeAcl(p); 554 return true; 555 } 556 }.run(src); 557 } 558 559 @Override 560 public void removeAclEntries(Path src, final List<AclEntry> aclSpec) 561 throws IOException { 562 new FsOperation(){ 563 @Override 564 boolean apply(Path p) throws IOException { 565 fs.removeAclEntries(p, aclSpec); 566 return true; 567 } 568 }.run(src); 569 } 570 571 @Override 572 public void removeDefaultAcl(Path src) throws IOException { 573 new FsOperation(){ 574 @Override 575 boolean apply(Path p) throws IOException { 576 fs.removeDefaultAcl(p); 577 return true; 578 } 579 }.run(src); 580 } 581 582 /** 583 * Set replication for an existing file. 584 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 585 * @param src file name 586 * @param replication new replication 587 * @throws IOException 588 * @return true if successful; 589 * false if file does not exist or is a directory 590 */ 591 @Override 592 public boolean setReplication(Path src, final short replication) 593 throws IOException { 594 return new FsOperation(){ 595 @Override 596 boolean apply(Path p) throws IOException { 597 return fs.setReplication(p, replication); 598 } 599 }.run(src); 600 } 601 602 /** 603 * Rename files/dirs 604 */ 605 @Override 606 public boolean rename(Path src, Path dst) throws IOException { 607 if (fs.isDirectory(src)) { 608 return fs.rename(src, dst); 609 } else { 610 if (fs.isDirectory(dst)) { 611 dst = new Path(dst, src.getName()); 612 } 613 614 boolean value = fs.rename(src, dst); 615 if (!value) 616 return false; 617 618 Path srcCheckFile = getChecksumFile(src); 619 Path dstCheckFile = getChecksumFile(dst); 620 if (fs.exists(srcCheckFile)) { //try to rename checksum 621 value = fs.rename(srcCheckFile, dstCheckFile); 622 } else if (fs.exists(dstCheckFile)) { 623 // no src checksum, so remove dst checksum 624 value = fs.delete(dstCheckFile, true); 625 } 626 627 return value; 628 } 629 } 630 631 /** 632 * Implement the delete(Path, boolean) in checksum 633 * file system. 634 */ 635 @Override 636 public boolean delete(Path f, boolean recursive) throws IOException{ 637 FileStatus fstatus = null; 638 try { 639 fstatus = fs.getFileStatus(f); 640 } catch(FileNotFoundException e) { 641 return false; 642 } 643 if (fstatus.isDirectory()) { 644 //this works since the crcs are in the same 645 //directories and the files. so we just delete 646 //everything in the underlying filesystem 647 return fs.delete(f, recursive); 648 } else { 649 Path checkFile = getChecksumFile(f); 650 if (fs.exists(checkFile)) { 651 fs.delete(checkFile, true); 652 } 653 return fs.delete(f, true); 654 } 655 } 656 657 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 658 @Override 659 public boolean accept(Path file) { 660 return !isChecksumFile(file); 661 } 662 }; 663 664 /** 665 * List the statuses of the files/directories in the given path if the path is 666 * a directory. 667 * 668 * @param f 669 * given path 670 * @return the statuses of the files/directories in the given path 671 * @throws IOException 672 */ 673 @Override 674 public FileStatus[] listStatus(Path f) throws IOException { 675 return fs.listStatus(f, DEFAULT_FILTER); 676 } 677 678 /** 679 * List the statuses of the files/directories in the given path if the path is 680 * a directory. 681 * 682 * @param f 683 * given path 684 * @return the statuses of the files/directories in the given patch 685 * @throws IOException 686 */ 687 @Override 688 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 689 throws IOException { 690 return fs.listLocatedStatus(f, DEFAULT_FILTER); 691 } 692 693 @Override 694 public boolean mkdirs(Path f) throws IOException { 695 return fs.mkdirs(f); 696 } 697 698 @Override 699 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 700 throws IOException { 701 Configuration conf = getConf(); 702 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 703 } 704 705 /** 706 * The src file is under FS, and the dst is on the local disk. 707 * Copy it from FS control to the local dst name. 708 */ 709 @Override 710 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 711 throws IOException { 712 Configuration conf = getConf(); 713 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 714 } 715 716 /** 717 * The src file is under FS, and the dst is on the local disk. 718 * Copy it from FS control to the local dst name. 719 * If src and dst are directories, the copyCrc parameter 720 * determines whether to copy CRC files. 721 */ 722 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 723 throws IOException { 724 if (!fs.isDirectory(src)) { // source is a file 725 fs.copyToLocalFile(src, dst); 726 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 727 if (localFs.isDirectory(dst)) { 728 dst = new Path(dst, src.getName()); 729 } 730 dst = getChecksumFile(dst); 731 if (localFs.exists(dst)) { //remove old local checksum file 732 localFs.delete(dst, true); 733 } 734 Path checksumFile = getChecksumFile(src); 735 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 736 fs.copyToLocalFile(checksumFile, dst); 737 } 738 } else { 739 FileStatus[] srcs = listStatus(src); 740 for (FileStatus srcFile : srcs) { 741 copyToLocalFile(srcFile.getPath(), 742 new Path(dst, srcFile.getPath().getName()), copyCrc); 743 } 744 } 745 } 746 747 @Override 748 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 749 throws IOException { 750 return tmpLocalFile; 751 } 752 753 @Override 754 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 755 throws IOException { 756 moveFromLocalFile(tmpLocalFile, fsOutputFile); 757 } 758 759 /** 760 * Report a checksum error to the file system. 761 * @param f the file name containing the error 762 * @param in the stream open on the file 763 * @param inPos the position of the beginning of the bad data in the file 764 * @param sums the stream open on the checksum file 765 * @param sumsPos the position of the beginning of the bad data in the checksum file 766 * @return if retry is neccessary 767 */ 768 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 769 long inPos, FSDataInputStream sums, long sumsPos) { 770 return false; 771 } 772}