001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027import java.util.List; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.permission.AclEntry; 033import org.apache.hadoop.fs.permission.FsPermission; 034import org.apache.hadoop.util.DataChecksum; 035import org.apache.hadoop.util.Progressable; 036 037/**************************************************************** 038 * Abstract Checksumed FileSystem. 039 * It provide a basic implementation of a Checksumed FileSystem, 040 * which creates a checksum file for each raw file. 041 * It generates & verifies checksums at the client side. 042 * 043 *****************************************************************/ 044@InterfaceAudience.Public 045@InterfaceStability.Stable 046public abstract class ChecksumFileSystem extends FilterFileSystem { 047 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 048 private int bytesPerChecksum = 512; 049 private boolean verifyChecksum = true; 050 private boolean writeChecksum = true; 051 052 public static double getApproxChkSumLength(long size) { 053 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 054 } 055 056 public ChecksumFileSystem(FileSystem fs) { 057 super(fs); 058 } 059 060 @Override 061 public void setConf(Configuration conf) { 062 super.setConf(conf); 063 if (conf != null) { 064 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 065 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 066 } 067 } 068 069 /** 070 * Set whether to verify checksum. 071 */ 072 @Override 073 public void setVerifyChecksum(boolean verifyChecksum) { 074 this.verifyChecksum = verifyChecksum; 075 } 076 077 @Override 078 public void setWriteChecksum(boolean writeChecksum) { 079 this.writeChecksum = writeChecksum; 080 } 081 082 /** get the raw file system */ 083 @Override 084 public FileSystem getRawFileSystem() { 085 return fs; 086 } 087 088 /** Return the name of the checksum file associated with a file.*/ 089 public Path getChecksumFile(Path file) { 090 return new Path(file.getParent(), "." + file.getName() + ".crc"); 091 } 092 093 /** Return true iff file is a checksum file name.*/ 094 public static boolean isChecksumFile(Path file) { 095 String name = file.getName(); 096 return name.startsWith(".") && name.endsWith(".crc"); 097 } 098 099 /** Return the length of the checksum file given the size of the 100 * actual file. 101 **/ 102 public long getChecksumFileLength(Path file, long fileSize) { 103 return getChecksumLength(fileSize, getBytesPerSum()); 104 } 105 106 /** Return the bytes Per Checksum */ 107 public int getBytesPerSum() { 108 return bytesPerChecksum; 109 } 110 111 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 112 int defaultBufferSize = getConf().getInt( 113 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 114 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 115 int proportionalBufferSize = bufferSize / bytesPerSum; 116 return Math.max(bytesPerSum, 117 Math.max(proportionalBufferSize, defaultBufferSize)); 118 } 119 120 /******************************************************* 121 * For open()'s FSInputStream 122 * It verifies that data matches checksums. 123 *******************************************************/ 124 private static class ChecksumFSInputChecker extends FSInputChecker { 125 private ChecksumFileSystem fs; 126 private FSDataInputStream datas; 127 private FSDataInputStream sums; 128 129 private static final int HEADER_LENGTH = 8; 130 131 private int bytesPerSum = 1; 132 133 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 134 throws IOException { 135 this(fs, file, fs.getConf().getInt( 136 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 137 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 138 } 139 140 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 141 throws IOException { 142 super( file, fs.getFileStatus(file).getReplication() ); 143 this.datas = fs.getRawFileSystem().open(file, bufferSize); 144 this.fs = fs; 145 Path sumFile = fs.getChecksumFile(file); 146 try { 147 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 148 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 149 150 byte[] version = new byte[CHECKSUM_VERSION.length]; 151 sums.readFully(version); 152 if (!Arrays.equals(version, CHECKSUM_VERSION)) 153 throw new IOException("Not a checksum file: "+sumFile); 154 this.bytesPerSum = sums.readInt(); 155 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 156 } catch (IOException e) { 157 // mincing the message is terrible, but java throws permission 158 // exceptions as FNF because that's all the method signatures allow! 159 if (!(e instanceof FileNotFoundException) || 160 e.getMessage().endsWith(" (Permission denied)")) { 161 LOG.warn("Problem opening checksum file: "+ file + 162 ". Ignoring exception: " , e); 163 } 164 set(fs.verifyChecksum, null, 1, 0); 165 } 166 } 167 168 private long getChecksumFilePos( long dataPos ) { 169 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 170 } 171 172 @Override 173 protected long getChunkPosition( long dataPos ) { 174 return dataPos/bytesPerSum*bytesPerSum; 175 } 176 177 @Override 178 public int available() throws IOException { 179 return datas.available() + super.available(); 180 } 181 182 @Override 183 public int read(long position, byte[] b, int off, int len) 184 throws IOException { 185 // parameter check 186 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 187 throw new IndexOutOfBoundsException(); 188 } else if (len == 0) { 189 return 0; 190 } 191 if( position<0 ) { 192 throw new IllegalArgumentException( 193 "Parameter position can not to be negative"); 194 } 195 196 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 197 checker.seek(position); 198 int nread = checker.read(b, off, len); 199 checker.close(); 200 return nread; 201 } 202 203 @Override 204 public void close() throws IOException { 205 datas.close(); 206 if( sums != null ) { 207 sums.close(); 208 } 209 set(fs.verifyChecksum, null, 1, 0); 210 } 211 212 213 @Override 214 public boolean seekToNewSource(long targetPos) throws IOException { 215 long sumsPos = getChecksumFilePos(targetPos); 216 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 217 boolean newDataSource = datas.seekToNewSource(targetPos); 218 return sums.seekToNewSource(sumsPos) || newDataSource; 219 } 220 221 @Override 222 protected int readChunk(long pos, byte[] buf, int offset, int len, 223 byte[] checksum) throws IOException { 224 225 boolean eof = false; 226 if (needChecksum()) { 227 assert checksum != null; // we have a checksum buffer 228 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 229 assert len >= bytesPerSum; // we must read at least one chunk 230 231 final int checksumsToRead = Math.min( 232 len/bytesPerSum, // number of checksums based on len to read 233 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 234 long checksumPos = getChecksumFilePos(pos); 235 if(checksumPos != sums.getPos()) { 236 sums.seek(checksumPos); 237 } 238 239 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 240 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 241 throw new ChecksumException( 242 "Checksum file not a length multiple of checksum size " + 243 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 244 " sumLenread: " + sumLenRead, 245 pos); 246 } 247 if (sumLenRead <= 0) { // we're at the end of the file 248 eof = true; 249 } else { 250 // Adjust amount of data to read based on how many checksum chunks we read 251 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 252 } 253 } 254 if(pos != datas.getPos()) { 255 datas.seek(pos); 256 } 257 int nread = readFully(datas, buf, offset, len); 258 if (eof && nread > 0) { 259 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 260 } 261 return nread; 262 } 263 } 264 265 private static class FSDataBoundedInputStream extends FSDataInputStream { 266 private FileSystem fs; 267 private Path file; 268 private long fileLen = -1L; 269 270 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 271 super(in); 272 this.fs = fs; 273 this.file = file; 274 } 275 276 @Override 277 public boolean markSupported() { 278 return false; 279 } 280 281 /* Return the file length */ 282 private long getFileLength() throws IOException { 283 if( fileLen==-1L ) { 284 fileLen = fs.getContentSummary(file).getLength(); 285 } 286 return fileLen; 287 } 288 289 /** 290 * Skips over and discards <code>n</code> bytes of data from the 291 * input stream. 292 * 293 *The <code>skip</code> method skips over some smaller number of bytes 294 * when reaching end of file before <code>n</code> bytes have been skipped. 295 * The actual number of bytes skipped is returned. If <code>n</code> is 296 * negative, no bytes are skipped. 297 * 298 * @param n the number of bytes to be skipped. 299 * @return the actual number of bytes skipped. 300 * @exception IOException if an I/O error occurs. 301 * ChecksumException if the chunk to skip to is corrupted 302 */ 303 @Override 304 public synchronized long skip(long n) throws IOException { 305 long curPos = getPos(); 306 long fileLength = getFileLength(); 307 if( n+curPos > fileLength ) { 308 n = fileLength - curPos; 309 } 310 return super.skip(n); 311 } 312 313 /** 314 * Seek to the given position in the stream. 315 * The next read() will be from that position. 316 * 317 * <p>This method does not allow seek past the end of the file. 318 * This produces IOException. 319 * 320 * @param pos the postion to seek to. 321 * @exception IOException if an I/O error occurs or seeks after EOF 322 * ChecksumException if the chunk to seek to is corrupted 323 */ 324 325 @Override 326 public synchronized void seek(long pos) throws IOException { 327 if (pos > getFileLength()) { 328 throw new EOFException("Cannot seek after EOF"); 329 } 330 super.seek(pos); 331 } 332 333 } 334 335 /** 336 * Opens an FSDataInputStream at the indicated Path. 337 * @param f the file name to open 338 * @param bufferSize the size of the buffer to be used. 339 */ 340 @Override 341 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 342 FileSystem fs; 343 InputStream in; 344 if (verifyChecksum) { 345 fs = this; 346 in = new ChecksumFSInputChecker(this, f, bufferSize); 347 } else { 348 fs = getRawFileSystem(); 349 in = fs.open(f, bufferSize); 350 } 351 return new FSDataBoundedInputStream(fs, f, in); 352 } 353 354 @Override 355 public FSDataOutputStream append(Path f, int bufferSize, 356 Progressable progress) throws IOException { 357 throw new IOException("Not supported"); 358 } 359 360 @Override 361 public boolean truncate(Path f, long newLength) throws IOException { 362 throw new IOException("Not supported"); 363 } 364 365 /** 366 * Calculated the length of the checksum file in bytes. 367 * @param size the length of the data file in bytes 368 * @param bytesPerSum the number of bytes in a checksum block 369 * @return the number of bytes in the checksum file 370 */ 371 public static long getChecksumLength(long size, int bytesPerSum) { 372 //the checksum length is equal to size passed divided by bytesPerSum + 373 //bytes written in the beginning of the checksum file. 374 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 375 CHECKSUM_VERSION.length + 4; 376 } 377 378 /** This class provides an output stream for a checksummed file. 379 * It generates checksums for data. */ 380 private static class ChecksumFSOutputSummer extends FSOutputSummer { 381 private FSDataOutputStream datas; 382 private FSDataOutputStream sums; 383 private static final float CHKSUM_AS_FRACTION = 0.01f; 384 private boolean isClosed = false; 385 386 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 387 Path file, 388 boolean overwrite, 389 int bufferSize, 390 short replication, 391 long blockSize, 392 Progressable progress, 393 FsPermission permission) 394 throws IOException { 395 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 396 fs.getBytesPerSum())); 397 int bytesPerSum = fs.getBytesPerSum(); 398 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 399 bufferSize, replication, blockSize, 400 progress); 401 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 402 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 403 permission, true, sumBufferSize, 404 replication, blockSize, null); 405 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 406 sums.writeInt(bytesPerSum); 407 } 408 409 @Override 410 public void close() throws IOException { 411 try { 412 flushBuffer(); 413 sums.close(); 414 datas.close(); 415 } finally { 416 isClosed = true; 417 } 418 } 419 420 @Override 421 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 422 int ckoff, int cklen) 423 throws IOException { 424 datas.write(b, offset, len); 425 sums.write(checksum, ckoff, cklen); 426 } 427 428 @Override 429 protected void checkClosed() throws IOException { 430 if (isClosed) { 431 throw new ClosedChannelException(); 432 } 433 } 434 } 435 436 @Override 437 public FSDataOutputStream create(Path f, FsPermission permission, 438 boolean overwrite, int bufferSize, short replication, long blockSize, 439 Progressable progress) throws IOException { 440 return create(f, permission, overwrite, true, bufferSize, 441 replication, blockSize, progress); 442 } 443 444 private FSDataOutputStream create(Path f, FsPermission permission, 445 boolean overwrite, boolean createParent, int bufferSize, 446 short replication, long blockSize, 447 Progressable progress) throws IOException { 448 Path parent = f.getParent(); 449 if (parent != null) { 450 if (!createParent && !exists(parent)) { 451 throw new FileNotFoundException("Parent directory doesn't exist: " 452 + parent); 453 } else if (!mkdirs(parent)) { 454 throw new IOException("Mkdirs failed to create " + parent 455 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 456 + ")"); 457 } 458 } 459 final FSDataOutputStream out; 460 if (writeChecksum) { 461 out = new FSDataOutputStream( 462 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 463 blockSize, progress, permission), null); 464 } else { 465 out = fs.create(f, permission, overwrite, bufferSize, replication, 466 blockSize, progress); 467 // remove the checksum file since we aren't writing one 468 Path checkFile = getChecksumFile(f); 469 if (fs.exists(checkFile)) { 470 fs.delete(checkFile, true); 471 } 472 } 473 return out; 474 } 475 476 @Override 477 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 478 boolean overwrite, int bufferSize, short replication, long blockSize, 479 Progressable progress) throws IOException { 480 return create(f, permission, overwrite, false, bufferSize, replication, 481 blockSize, progress); 482 } 483 484 abstract class FsOperation { 485 boolean run(Path p) throws IOException { 486 boolean status = apply(p); 487 if (status) { 488 Path checkFile = getChecksumFile(p); 489 if (fs.exists(checkFile)) { 490 apply(checkFile); 491 } 492 } 493 return status; 494 } 495 abstract boolean apply(Path p) throws IOException; 496 } 497 498 499 @Override 500 public void setPermission(Path src, final FsPermission permission) 501 throws IOException { 502 new FsOperation(){ 503 @Override 504 boolean apply(Path p) throws IOException { 505 fs.setPermission(p, permission); 506 return true; 507 } 508 }.run(src); 509 } 510 511 @Override 512 public void setOwner(Path src, final String username, final String groupname) 513 throws IOException { 514 new FsOperation(){ 515 @Override 516 boolean apply(Path p) throws IOException { 517 fs.setOwner(p, username, groupname); 518 return true; 519 } 520 }.run(src); 521 } 522 523 @Override 524 public void setAcl(Path src, final List<AclEntry> aclSpec) 525 throws IOException { 526 new FsOperation(){ 527 @Override 528 boolean apply(Path p) throws IOException { 529 fs.setAcl(p, aclSpec); 530 return true; 531 } 532 }.run(src); 533 } 534 535 @Override 536 public void modifyAclEntries(Path src, final List<AclEntry> aclSpec) 537 throws IOException { 538 new FsOperation(){ 539 @Override 540 boolean apply(Path p) throws IOException { 541 fs.modifyAclEntries(p, aclSpec); 542 return true; 543 } 544 }.run(src); 545 } 546 547 @Override 548 public void removeAcl(Path src) throws IOException { 549 new FsOperation(){ 550 @Override 551 boolean apply(Path p) throws IOException { 552 fs.removeAcl(p); 553 return true; 554 } 555 }.run(src); 556 } 557 558 @Override 559 public void removeAclEntries(Path src, final List<AclEntry> aclSpec) 560 throws IOException { 561 new FsOperation(){ 562 @Override 563 boolean apply(Path p) throws IOException { 564 fs.removeAclEntries(p, aclSpec); 565 return true; 566 } 567 }.run(src); 568 } 569 570 @Override 571 public void removeDefaultAcl(Path src) throws IOException { 572 new FsOperation(){ 573 @Override 574 boolean apply(Path p) throws IOException { 575 fs.removeDefaultAcl(p); 576 return true; 577 } 578 }.run(src); 579 } 580 581 /** 582 * Set replication for an existing file. 583 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 584 * @param src file name 585 * @param replication new replication 586 * @throws IOException 587 * @return true if successful; 588 * false if file does not exist or is a directory 589 */ 590 @Override 591 public boolean setReplication(Path src, final short replication) 592 throws IOException { 593 return new FsOperation(){ 594 @Override 595 boolean apply(Path p) throws IOException { 596 return fs.setReplication(p, replication); 597 } 598 }.run(src); 599 } 600 601 /** 602 * Rename files/dirs 603 */ 604 @Override 605 public boolean rename(Path src, Path dst) throws IOException { 606 if (fs.isDirectory(src)) { 607 return fs.rename(src, dst); 608 } else { 609 if (fs.isDirectory(dst)) { 610 dst = new Path(dst, src.getName()); 611 } 612 613 boolean value = fs.rename(src, dst); 614 if (!value) 615 return false; 616 617 Path srcCheckFile = getChecksumFile(src); 618 Path dstCheckFile = getChecksumFile(dst); 619 if (fs.exists(srcCheckFile)) { //try to rename checksum 620 value = fs.rename(srcCheckFile, dstCheckFile); 621 } else if (fs.exists(dstCheckFile)) { 622 // no src checksum, so remove dst checksum 623 value = fs.delete(dstCheckFile, true); 624 } 625 626 return value; 627 } 628 } 629 630 /** 631 * Implement the delete(Path, boolean) in checksum 632 * file system. 633 */ 634 @Override 635 public boolean delete(Path f, boolean recursive) throws IOException{ 636 FileStatus fstatus = null; 637 try { 638 fstatus = fs.getFileStatus(f); 639 } catch(FileNotFoundException e) { 640 return false; 641 } 642 if (fstatus.isDirectory()) { 643 //this works since the crcs are in the same 644 //directories and the files. so we just delete 645 //everything in the underlying filesystem 646 return fs.delete(f, recursive); 647 } else { 648 Path checkFile = getChecksumFile(f); 649 if (fs.exists(checkFile)) { 650 fs.delete(checkFile, true); 651 } 652 return fs.delete(f, true); 653 } 654 } 655 656 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 657 @Override 658 public boolean accept(Path file) { 659 return !isChecksumFile(file); 660 } 661 }; 662 663 /** 664 * List the statuses of the files/directories in the given path if the path is 665 * a directory. 666 * 667 * @param f 668 * given path 669 * @return the statuses of the files/directories in the given path 670 * @throws IOException 671 */ 672 @Override 673 public FileStatus[] listStatus(Path f) throws IOException { 674 return fs.listStatus(f, DEFAULT_FILTER); 675 } 676 677 /** 678 * List the statuses of the files/directories in the given path if the path is 679 * a directory. 680 * 681 * @param f 682 * given path 683 * @return the statuses of the files/directories in the given patch 684 * @throws IOException 685 */ 686 @Override 687 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 688 throws IOException { 689 return fs.listLocatedStatus(f, DEFAULT_FILTER); 690 } 691 692 @Override 693 public boolean mkdirs(Path f) throws IOException { 694 return fs.mkdirs(f); 695 } 696 697 @Override 698 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 699 throws IOException { 700 Configuration conf = getConf(); 701 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 702 } 703 704 /** 705 * The src file is under FS, and the dst is on the local disk. 706 * Copy it from FS control to the local dst name. 707 */ 708 @Override 709 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 710 throws IOException { 711 Configuration conf = getConf(); 712 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 713 } 714 715 /** 716 * The src file is under FS, and the dst is on the local disk. 717 * Copy it from FS control to the local dst name. 718 * If src and dst are directories, the copyCrc parameter 719 * determines whether to copy CRC files. 720 */ 721 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 722 throws IOException { 723 if (!fs.isDirectory(src)) { // source is a file 724 fs.copyToLocalFile(src, dst); 725 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 726 if (localFs.isDirectory(dst)) { 727 dst = new Path(dst, src.getName()); 728 } 729 dst = getChecksumFile(dst); 730 if (localFs.exists(dst)) { //remove old local checksum file 731 localFs.delete(dst, true); 732 } 733 Path checksumFile = getChecksumFile(src); 734 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 735 fs.copyToLocalFile(checksumFile, dst); 736 } 737 } else { 738 FileStatus[] srcs = listStatus(src); 739 for (FileStatus srcFile : srcs) { 740 copyToLocalFile(srcFile.getPath(), 741 new Path(dst, srcFile.getPath().getName()), copyCrc); 742 } 743 } 744 } 745 746 @Override 747 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 748 throws IOException { 749 return tmpLocalFile; 750 } 751 752 @Override 753 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 754 throws IOException { 755 moveFromLocalFile(tmpLocalFile, fsOutputFile); 756 } 757 758 /** 759 * Report a checksum error to the file system. 760 * @param f the file name containing the error 761 * @param in the stream open on the file 762 * @param inPos the position of the beginning of the bad data in the file 763 * @param sums the stream open on the checksum file 764 * @param sumsPos the position of the beginning of the bad data in the checksum file 765 * @return if retry is neccessary 766 */ 767 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 768 long inPos, FSDataInputStream sums, long sumsPos) { 769 return false; 770 } 771}