001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.channels.ClosedChannelException;
026import java.util.Arrays;
027import java.util.List;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.permission.AclEntry;
033import org.apache.hadoop.fs.permission.FsPermission;
034import org.apache.hadoop.util.DataChecksum;
035import org.apache.hadoop.util.Progressable;
036
037/****************************************************************
038 * Abstract Checksumed FileSystem.
039 * It provide a basic implementation of a Checksumed FileSystem,
040 * which creates a checksum file for each raw file.
041 * It generates & verifies checksums at the client side.
042 *
043 *****************************************************************/
044@InterfaceAudience.Public
045@InterfaceStability.Stable
046public abstract class ChecksumFileSystem extends FilterFileSystem {
047  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
048  private int bytesPerChecksum = 512;
049  private boolean verifyChecksum = true;
050  private boolean writeChecksum = true;
051
052  public static double getApproxChkSumLength(long size) {
053    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
054  }
055  
056  public ChecksumFileSystem(FileSystem fs) {
057    super(fs);
058  }
059
060  @Override
061  public void setConf(Configuration conf) {
062    super.setConf(conf);
063    if (conf != null) {
064      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
065                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
066    }
067  }
068  
069  /**
070   * Set whether to verify checksum.
071   */
072  @Override
073  public void setVerifyChecksum(boolean verifyChecksum) {
074    this.verifyChecksum = verifyChecksum;
075  }
076
077  @Override
078  public void setWriteChecksum(boolean writeChecksum) {
079    this.writeChecksum = writeChecksum;
080  }
081  
082  /** get the raw file system */
083  @Override
084  public FileSystem getRawFileSystem() {
085    return fs;
086  }
087
088  /** Return the name of the checksum file associated with a file.*/
089  public Path getChecksumFile(Path file) {
090    return new Path(file.getParent(), "." + file.getName() + ".crc");
091  }
092
093  /** Return true iff file is a checksum file name.*/
094  public static boolean isChecksumFile(Path file) {
095    String name = file.getName();
096    return name.startsWith(".") && name.endsWith(".crc");
097  }
098
099  /** Return the length of the checksum file given the size of the 
100   * actual file.
101   **/
102  public long getChecksumFileLength(Path file, long fileSize) {
103    return getChecksumLength(fileSize, getBytesPerSum());
104  }
105
106  /** Return the bytes Per Checksum */
107  public int getBytesPerSum() {
108    return bytesPerChecksum;
109  }
110
111  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
112    int defaultBufferSize = getConf().getInt(
113                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
114                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
115    int proportionalBufferSize = bufferSize / bytesPerSum;
116    return Math.max(bytesPerSum,
117                    Math.max(proportionalBufferSize, defaultBufferSize));
118  }
119
120  /*******************************************************
121   * For open()'s FSInputStream
122   * It verifies that data matches checksums.
123   *******************************************************/
124  private static class ChecksumFSInputChecker extends FSInputChecker {
125    private ChecksumFileSystem fs;
126    private FSDataInputStream datas;
127    private FSDataInputStream sums;
128    
129    private static final int HEADER_LENGTH = 8;
130    
131    private int bytesPerSum = 1;
132    
133    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
134      throws IOException {
135      this(fs, file, fs.getConf().getInt(
136                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
137                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
138    }
139    
140    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
141      throws IOException {
142      super( file, fs.getFileStatus(file).getReplication() );
143      this.datas = fs.getRawFileSystem().open(file, bufferSize);
144      this.fs = fs;
145      Path sumFile = fs.getChecksumFile(file);
146      try {
147        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
148        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
149
150        byte[] version = new byte[CHECKSUM_VERSION.length];
151        sums.readFully(version);
152        if (!Arrays.equals(version, CHECKSUM_VERSION))
153          throw new IOException("Not a checksum file: "+sumFile);
154        this.bytesPerSum = sums.readInt();
155        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
156      } catch (IOException e) {
157        // mincing the message is terrible, but java throws permission
158        // exceptions as FNF because that's all the method signatures allow!
159        if (!(e instanceof FileNotFoundException) ||
160            e.getMessage().endsWith(" (Permission denied)")) {
161          LOG.warn("Problem opening checksum file: "+ file +
162              ".  Ignoring exception: " , e);
163        }
164        set(fs.verifyChecksum, null, 1, 0);
165      }
166    }
167    
168    private long getChecksumFilePos( long dataPos ) {
169      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
170    }
171    
172    @Override
173    protected long getChunkPosition( long dataPos ) {
174      return dataPos/bytesPerSum*bytesPerSum;
175    }
176    
177    @Override
178    public int available() throws IOException {
179      return datas.available() + super.available();
180    }
181    
182    @Override
183    public int read(long position, byte[] b, int off, int len)
184      throws IOException {
185      // parameter check
186      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
187        throw new IndexOutOfBoundsException();
188      } else if (len == 0) {
189        return 0;
190      }
191      if( position<0 ) {
192        throw new IllegalArgumentException(
193            "Parameter position can not to be negative");
194      }
195
196      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
197      checker.seek(position);
198      int nread = checker.read(b, off, len);
199      checker.close();
200      return nread;
201    }
202    
203    @Override
204    public void close() throws IOException {
205      datas.close();
206      if( sums != null ) {
207        sums.close();
208      }
209      set(fs.verifyChecksum, null, 1, 0);
210    }
211    
212
213    @Override
214    public boolean seekToNewSource(long targetPos) throws IOException {
215      long sumsPos = getChecksumFilePos(targetPos);
216      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
217      boolean newDataSource = datas.seekToNewSource(targetPos);
218      return sums.seekToNewSource(sumsPos) || newDataSource;
219    }
220
221    @Override
222    protected int readChunk(long pos, byte[] buf, int offset, int len,
223        byte[] checksum) throws IOException {
224
225      boolean eof = false;
226      if (needChecksum()) {
227        assert checksum != null; // we have a checksum buffer
228        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
229        assert len >= bytesPerSum; // we must read at least one chunk
230
231        final int checksumsToRead = Math.min(
232          len/bytesPerSum, // number of checksums based on len to read
233          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
234        long checksumPos = getChecksumFilePos(pos); 
235        if(checksumPos != sums.getPos()) {
236          sums.seek(checksumPos);
237        }
238
239        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
240        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
241          throw new ChecksumException(
242            "Checksum file not a length multiple of checksum size " +
243            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
244            " sumLenread: " + sumLenRead,
245            pos);
246        }
247        if (sumLenRead <= 0) { // we're at the end of the file
248          eof = true;
249        } else {
250          // Adjust amount of data to read based on how many checksum chunks we read
251          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
252        }
253      }
254      if(pos != datas.getPos()) {
255        datas.seek(pos);
256      }
257      int nread = readFully(datas, buf, offset, len);
258      if (eof && nread > 0) {
259        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
260      }
261      return nread;
262    }
263  }
264  
265  private static class FSDataBoundedInputStream extends FSDataInputStream {
266    private FileSystem fs;
267    private Path file;
268    private long fileLen = -1L;
269
270    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
271      super(in);
272      this.fs = fs;
273      this.file = file;
274    }
275    
276    @Override
277    public boolean markSupported() {
278      return false;
279    }
280    
281    /* Return the file length */
282    private long getFileLength() throws IOException {
283      if( fileLen==-1L ) {
284        fileLen = fs.getContentSummary(file).getLength();
285      }
286      return fileLen;
287    }
288    
289    /**
290     * Skips over and discards <code>n</code> bytes of data from the
291     * input stream.
292     *
293     *The <code>skip</code> method skips over some smaller number of bytes
294     * when reaching end of file before <code>n</code> bytes have been skipped.
295     * The actual number of bytes skipped is returned.  If <code>n</code> is
296     * negative, no bytes are skipped.
297     *
298     * @param      n   the number of bytes to be skipped.
299     * @return     the actual number of bytes skipped.
300     * @exception  IOException  if an I/O error occurs.
301     *             ChecksumException if the chunk to skip to is corrupted
302     */
303    @Override
304    public synchronized long skip(long n) throws IOException {
305      long curPos = getPos();
306      long fileLength = getFileLength();
307      if( n+curPos > fileLength ) {
308        n = fileLength - curPos;
309      }
310      return super.skip(n);
311    }
312    
313    /**
314     * Seek to the given position in the stream.
315     * The next read() will be from that position.
316     * 
317     * <p>This method does not allow seek past the end of the file.
318     * This produces IOException.
319     *
320     * @param      pos   the postion to seek to.
321     * @exception  IOException  if an I/O error occurs or seeks after EOF
322     *             ChecksumException if the chunk to seek to is corrupted
323     */
324
325    @Override
326    public synchronized void seek(long pos) throws IOException {
327      if (pos > getFileLength()) {
328        throw new EOFException("Cannot seek after EOF");
329      }
330      super.seek(pos);
331    }
332
333  }
334
335  /**
336   * Opens an FSDataInputStream at the indicated Path.
337   * @param f the file name to open
338   * @param bufferSize the size of the buffer to be used.
339   */
340  @Override
341  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
342    FileSystem fs;
343    InputStream in;
344    if (verifyChecksum) {
345      fs = this;
346      in = new ChecksumFSInputChecker(this, f, bufferSize);
347    } else {
348      fs = getRawFileSystem();
349      in = fs.open(f, bufferSize);
350    }
351    return new FSDataBoundedInputStream(fs, f, in);
352  }
353
354  @Override
355  public FSDataOutputStream append(Path f, int bufferSize,
356      Progressable progress) throws IOException {
357    throw new IOException("Not supported");
358  }
359
360  @Override
361  public boolean truncate(Path f, long newLength) throws IOException {
362    throw new IOException("Not supported");
363  }
364
365  /**
366   * Calculated the length of the checksum file in bytes.
367   * @param size the length of the data file in bytes
368   * @param bytesPerSum the number of bytes in a checksum block
369   * @return the number of bytes in the checksum file
370   */
371  public static long getChecksumLength(long size, int bytesPerSum) {
372    //the checksum length is equal to size passed divided by bytesPerSum +
373    //bytes written in the beginning of the checksum file.  
374    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
375             CHECKSUM_VERSION.length + 4;  
376  }
377
378  /** This class provides an output stream for a checksummed file.
379   * It generates checksums for data. */
380  private static class ChecksumFSOutputSummer extends FSOutputSummer {
381    private FSDataOutputStream datas;    
382    private FSDataOutputStream sums;
383    private static final float CHKSUM_AS_FRACTION = 0.01f;
384    private boolean isClosed = false;
385    
386    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
387                          Path file, 
388                          boolean overwrite,
389                          int bufferSize,
390                          short replication,
391                          long blockSize,
392                          Progressable progress,
393                          FsPermission permission)
394      throws IOException {
395      super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
396          fs.getBytesPerSum()));
397      int bytesPerSum = fs.getBytesPerSum();
398      this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
399                                         bufferSize, replication, blockSize,
400                                         progress);
401      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
402      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
403                                               permission, true, sumBufferSize,
404                                               replication, blockSize, null);
405      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
406      sums.writeInt(bytesPerSum);
407    }
408    
409    @Override
410    public void close() throws IOException {
411      try {
412        flushBuffer();
413        sums.close();
414        datas.close();
415      } finally {
416        isClosed = true;
417      }
418    }
419    
420    @Override
421    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
422        int ckoff, int cklen)
423    throws IOException {
424      datas.write(b, offset, len);
425      sums.write(checksum, ckoff, cklen);
426    }
427
428    @Override
429    protected void checkClosed() throws IOException {
430      if (isClosed) {
431        throw new ClosedChannelException();
432      }
433    }
434  }
435
436  @Override
437  public FSDataOutputStream create(Path f, FsPermission permission,
438      boolean overwrite, int bufferSize, short replication, long blockSize,
439      Progressable progress) throws IOException {
440    return create(f, permission, overwrite, true, bufferSize,
441        replication, blockSize, progress);
442  }
443
444  private FSDataOutputStream create(Path f, FsPermission permission,
445      boolean overwrite, boolean createParent, int bufferSize,
446      short replication, long blockSize,
447      Progressable progress) throws IOException {
448    Path parent = f.getParent();
449    if (parent != null) {
450      if (!createParent && !exists(parent)) {
451        throw new FileNotFoundException("Parent directory doesn't exist: "
452            + parent);
453      } else if (!mkdirs(parent)) {
454        throw new IOException("Mkdirs failed to create " + parent
455            + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
456            + ")");
457      }
458    }
459    final FSDataOutputStream out;
460    if (writeChecksum) {
461      out = new FSDataOutputStream(
462          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
463              blockSize, progress, permission), null);
464    } else {
465      out = fs.create(f, permission, overwrite, bufferSize, replication,
466          blockSize, progress);
467      // remove the checksum file since we aren't writing one
468      Path checkFile = getChecksumFile(f);
469      if (fs.exists(checkFile)) {
470        fs.delete(checkFile, true);
471      }
472    }
473    return out;
474  }
475
476  @Override
477  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
478      boolean overwrite, int bufferSize, short replication, long blockSize,
479      Progressable progress) throws IOException {
480    return create(f, permission, overwrite, false, bufferSize, replication,
481        blockSize, progress);
482  }
483
484  abstract class FsOperation {
485    boolean run(Path p) throws IOException {
486      boolean status = apply(p);
487      if (status) {
488        Path checkFile = getChecksumFile(p);
489        if (fs.exists(checkFile)) {
490          apply(checkFile);
491        }
492      }
493      return status;
494    }
495    abstract boolean apply(Path p) throws IOException;
496  }
497
498
499  @Override
500  public void setPermission(Path src, final FsPermission permission)
501      throws IOException {
502    new FsOperation(){
503      @Override
504      boolean apply(Path p) throws IOException {
505        fs.setPermission(p, permission);
506        return true;
507      }
508    }.run(src);
509  }
510
511  @Override
512  public void setOwner(Path src, final String username, final String groupname)
513      throws IOException {
514    new FsOperation(){
515      @Override
516      boolean apply(Path p) throws IOException {
517        fs.setOwner(p, username, groupname);
518        return true;
519      }
520    }.run(src);
521  }
522
523  @Override
524  public void setAcl(Path src, final List<AclEntry> aclSpec)
525      throws IOException {
526    new FsOperation(){
527      @Override
528      boolean apply(Path p) throws IOException {
529        fs.setAcl(p, aclSpec);
530        return true;
531      }
532    }.run(src);
533  }
534
535  @Override
536  public void modifyAclEntries(Path src, final List<AclEntry> aclSpec)
537      throws IOException {
538    new FsOperation(){
539      @Override
540      boolean apply(Path p) throws IOException {
541        fs.modifyAclEntries(p, aclSpec);
542        return true;
543      }
544    }.run(src);
545  }
546
547  @Override
548  public void removeAcl(Path src) throws IOException {
549    new FsOperation(){
550      @Override
551      boolean apply(Path p) throws IOException {
552        fs.removeAcl(p);
553        return true;
554      }
555    }.run(src);
556  }
557
558  @Override
559  public void removeAclEntries(Path src, final List<AclEntry> aclSpec)
560      throws IOException {
561    new FsOperation(){
562      @Override
563      boolean apply(Path p) throws IOException {
564        fs.removeAclEntries(p, aclSpec);
565        return true;
566      }
567    }.run(src);
568  }
569
570  @Override
571  public void removeDefaultAcl(Path src) throws IOException {
572    new FsOperation(){
573      @Override
574      boolean apply(Path p) throws IOException {
575        fs.removeDefaultAcl(p);
576        return true;
577      }
578    }.run(src);
579  }
580
581  /**
582   * Set replication for an existing file.
583   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
584   * @param src file name
585   * @param replication new replication
586   * @throws IOException
587   * @return true if successful;
588   *         false if file does not exist or is a directory
589   */
590  @Override
591  public boolean setReplication(Path src, final short replication)
592      throws IOException {
593    return new FsOperation(){
594      @Override
595      boolean apply(Path p) throws IOException {
596        return fs.setReplication(p, replication);
597      }
598    }.run(src);
599  }
600
601  /**
602   * Rename files/dirs
603   */
604  @Override
605  public boolean rename(Path src, Path dst) throws IOException {
606    if (fs.isDirectory(src)) {
607      return fs.rename(src, dst);
608    } else {
609      if (fs.isDirectory(dst)) {
610        dst = new Path(dst, src.getName());
611      }
612
613      boolean value = fs.rename(src, dst);
614      if (!value)
615        return false;
616
617      Path srcCheckFile = getChecksumFile(src);
618      Path dstCheckFile = getChecksumFile(dst);
619      if (fs.exists(srcCheckFile)) { //try to rename checksum
620        value = fs.rename(srcCheckFile, dstCheckFile);
621      } else if (fs.exists(dstCheckFile)) {
622        // no src checksum, so remove dst checksum
623        value = fs.delete(dstCheckFile, true); 
624      }
625
626      return value;
627    }
628  }
629
630  /**
631   * Implement the delete(Path, boolean) in checksum
632   * file system.
633   */
634  @Override
635  public boolean delete(Path f, boolean recursive) throws IOException{
636    FileStatus fstatus = null;
637    try {
638      fstatus = fs.getFileStatus(f);
639    } catch(FileNotFoundException e) {
640      return false;
641    }
642    if (fstatus.isDirectory()) {
643      //this works since the crcs are in the same
644      //directories and the files. so we just delete
645      //everything in the underlying filesystem
646      return fs.delete(f, recursive);
647    } else {
648      Path checkFile = getChecksumFile(f);
649      if (fs.exists(checkFile)) {
650        fs.delete(checkFile, true);
651      }
652      return fs.delete(f, true);
653    }
654  }
655    
656  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
657    @Override
658    public boolean accept(Path file) {
659      return !isChecksumFile(file);
660    }
661  };
662
663  /**
664   * List the statuses of the files/directories in the given path if the path is
665   * a directory.
666   * 
667   * @param f
668   *          given path
669   * @return the statuses of the files/directories in the given path
670   * @throws IOException
671   */
672  @Override
673  public FileStatus[] listStatus(Path f) throws IOException {
674    return fs.listStatus(f, DEFAULT_FILTER);
675  }
676  
677  /**
678   * List the statuses of the files/directories in the given path if the path is
679   * a directory.
680   * 
681   * @param f
682   *          given path
683   * @return the statuses of the files/directories in the given patch
684   * @throws IOException
685   */
686  @Override
687  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
688  throws IOException {
689    return fs.listLocatedStatus(f, DEFAULT_FILTER);
690  }
691  
692  @Override
693  public boolean mkdirs(Path f) throws IOException {
694    return fs.mkdirs(f);
695  }
696
697  @Override
698  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
699    throws IOException {
700    Configuration conf = getConf();
701    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
702  }
703
704  /**
705   * The src file is under FS, and the dst is on the local disk.
706   * Copy it from FS control to the local dst name.
707   */
708  @Override
709  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
710    throws IOException {
711    Configuration conf = getConf();
712    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
713  }
714
715  /**
716   * The src file is under FS, and the dst is on the local disk.
717   * Copy it from FS control to the local dst name.
718   * If src and dst are directories, the copyCrc parameter
719   * determines whether to copy CRC files.
720   */
721  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
722    throws IOException {
723    if (!fs.isDirectory(src)) { // source is a file
724      fs.copyToLocalFile(src, dst);
725      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
726      if (localFs.isDirectory(dst)) {
727        dst = new Path(dst, src.getName());
728      }
729      dst = getChecksumFile(dst);
730      if (localFs.exists(dst)) { //remove old local checksum file
731        localFs.delete(dst, true);
732      }
733      Path checksumFile = getChecksumFile(src);
734      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
735        fs.copyToLocalFile(checksumFile, dst);
736      }
737    } else {
738      FileStatus[] srcs = listStatus(src);
739      for (FileStatus srcFile : srcs) {
740        copyToLocalFile(srcFile.getPath(), 
741                        new Path(dst, srcFile.getPath().getName()), copyCrc);
742      }
743    }
744  }
745
746  @Override
747  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
748    throws IOException {
749    return tmpLocalFile;
750  }
751
752  @Override
753  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
754    throws IOException {
755    moveFromLocalFile(tmpLocalFile, fsOutputFile);
756  }
757
758  /**
759   * Report a checksum error to the file system.
760   * @param f the file name containing the error
761   * @param in the stream open on the file
762   * @param inPos the position of the beginning of the bad data in the file
763   * @param sums the stream open on the checksum file
764   * @param sumsPos the position of the beginning of the bad data in the checksum file
765   * @return if retry is neccessary
766   */
767  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
768                                       long inPos, FSDataInputStream sums, long sumsPos) {
769    return false;
770  }
771}