001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.channels.ClosedChannelException;
026import java.util.Arrays;
027
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.permission.FsPermission;
032import org.apache.hadoop.util.DataChecksum;
033import org.apache.hadoop.util.Progressable;
034
035/****************************************************************
036 * Abstract Checksumed FileSystem.
037 * It provide a basic implementation of a Checksumed FileSystem,
038 * which creates a checksum file for each raw file.
039 * It generates & verifies checksums at the client side.
040 *
041 *****************************************************************/
042@InterfaceAudience.Public
043@InterfaceStability.Stable
044public abstract class ChecksumFileSystem extends FilterFileSystem {
045  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
046  private int bytesPerChecksum = 512;
047  private boolean verifyChecksum = true;
048  private boolean writeChecksum = true;
049
050  public static double getApproxChkSumLength(long size) {
051    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
052  }
053  
054  public ChecksumFileSystem(FileSystem fs) {
055    super(fs);
056  }
057
058  @Override
059  public void setConf(Configuration conf) {
060    super.setConf(conf);
061    if (conf != null) {
062      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
063                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
064    }
065  }
066  
067  /**
068   * Set whether to verify checksum.
069   */
070  @Override
071  public void setVerifyChecksum(boolean verifyChecksum) {
072    this.verifyChecksum = verifyChecksum;
073  }
074
075  @Override
076  public void setWriteChecksum(boolean writeChecksum) {
077    this.writeChecksum = writeChecksum;
078  }
079  
080  /** get the raw file system */
081  @Override
082  public FileSystem getRawFileSystem() {
083    return fs;
084  }
085
086  /** Return the name of the checksum file associated with a file.*/
087  public Path getChecksumFile(Path file) {
088    return new Path(file.getParent(), "." + file.getName() + ".crc");
089  }
090
091  /** Return true iff file is a checksum file name.*/
092  public static boolean isChecksumFile(Path file) {
093    String name = file.getName();
094    return name.startsWith(".") && name.endsWith(".crc");
095  }
096
097  /** Return the length of the checksum file given the size of the 
098   * actual file.
099   **/
100  public long getChecksumFileLength(Path file, long fileSize) {
101    return getChecksumLength(fileSize, getBytesPerSum());
102  }
103
104  /** Return the bytes Per Checksum */
105  public int getBytesPerSum() {
106    return bytesPerChecksum;
107  }
108
109  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
110    int defaultBufferSize = getConf().getInt(
111                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
112                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
113    int proportionalBufferSize = bufferSize / bytesPerSum;
114    return Math.max(bytesPerSum,
115                    Math.max(proportionalBufferSize, defaultBufferSize));
116  }
117
118  /*******************************************************
119   * For open()'s FSInputStream
120   * It verifies that data matches checksums.
121   *******************************************************/
122  private static class ChecksumFSInputChecker extends FSInputChecker {
123    private ChecksumFileSystem fs;
124    private FSDataInputStream datas;
125    private FSDataInputStream sums;
126    
127    private static final int HEADER_LENGTH = 8;
128    
129    private int bytesPerSum = 1;
130    
131    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
132      throws IOException {
133      this(fs, file, fs.getConf().getInt(
134                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
135                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
136    }
137    
138    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
139      throws IOException {
140      super( file, fs.getFileStatus(file).getReplication() );
141      this.datas = fs.getRawFileSystem().open(file, bufferSize);
142      this.fs = fs;
143      Path sumFile = fs.getChecksumFile(file);
144      try {
145        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
146        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
147
148        byte[] version = new byte[CHECKSUM_VERSION.length];
149        sums.readFully(version);
150        if (!Arrays.equals(version, CHECKSUM_VERSION))
151          throw new IOException("Not a checksum file: "+sumFile);
152        this.bytesPerSum = sums.readInt();
153        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
154      } catch (FileNotFoundException e) {         // quietly ignore
155        set(fs.verifyChecksum, null, 1, 0);
156      } catch (IOException e) {                   // loudly ignore
157        LOG.warn("Problem opening checksum file: "+ file + 
158                 ".  Ignoring exception: " , e); 
159        set(fs.verifyChecksum, null, 1, 0);
160      }
161    }
162    
163    private long getChecksumFilePos( long dataPos ) {
164      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
165    }
166    
167    @Override
168    protected long getChunkPosition( long dataPos ) {
169      return dataPos/bytesPerSum*bytesPerSum;
170    }
171    
172    @Override
173    public int available() throws IOException {
174      return datas.available() + super.available();
175    }
176    
177    @Override
178    public int read(long position, byte[] b, int off, int len)
179      throws IOException {
180      // parameter check
181      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
182        throw new IndexOutOfBoundsException();
183      } else if (len == 0) {
184        return 0;
185      }
186      if( position<0 ) {
187        throw new IllegalArgumentException(
188            "Parameter position can not to be negative");
189      }
190
191      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
192      checker.seek(position);
193      int nread = checker.read(b, off, len);
194      checker.close();
195      return nread;
196    }
197    
198    @Override
199    public void close() throws IOException {
200      datas.close();
201      if( sums != null ) {
202        sums.close();
203      }
204      set(fs.verifyChecksum, null, 1, 0);
205    }
206    
207
208    @Override
209    public boolean seekToNewSource(long targetPos) throws IOException {
210      long sumsPos = getChecksumFilePos(targetPos);
211      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
212      boolean newDataSource = datas.seekToNewSource(targetPos);
213      return sums.seekToNewSource(sumsPos) || newDataSource;
214    }
215
216    @Override
217    protected int readChunk(long pos, byte[] buf, int offset, int len,
218        byte[] checksum) throws IOException {
219
220      boolean eof = false;
221      if (needChecksum()) {
222        assert checksum != null; // we have a checksum buffer
223        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
224        assert len >= bytesPerSum; // we must read at least one chunk
225
226        final int checksumsToRead = Math.min(
227          len/bytesPerSum, // number of checksums based on len to read
228          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
229        long checksumPos = getChecksumFilePos(pos); 
230        if(checksumPos != sums.getPos()) {
231          sums.seek(checksumPos);
232        }
233
234        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
235        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
236          throw new ChecksumException(
237            "Checksum file not a length multiple of checksum size " +
238            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
239            " sumLenread: " + sumLenRead,
240            pos);
241        }
242        if (sumLenRead <= 0) { // we're at the end of the file
243          eof = true;
244        } else {
245          // Adjust amount of data to read based on how many checksum chunks we read
246          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
247        }
248      }
249      if(pos != datas.getPos()) {
250        datas.seek(pos);
251      }
252      int nread = readFully(datas, buf, offset, len);
253      if (eof && nread > 0) {
254        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
255      }
256      return nread;
257    }
258  }
259  
260  private static class FSDataBoundedInputStream extends FSDataInputStream {
261    private FileSystem fs;
262    private Path file;
263    private long fileLen = -1L;
264
265    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
266      super(in);
267      this.fs = fs;
268      this.file = file;
269    }
270    
271    @Override
272    public boolean markSupported() {
273      return false;
274    }
275    
276    /* Return the file length */
277    private long getFileLength() throws IOException {
278      if( fileLen==-1L ) {
279        fileLen = fs.getContentSummary(file).getLength();
280      }
281      return fileLen;
282    }
283    
284    /**
285     * Skips over and discards <code>n</code> bytes of data from the
286     * input stream.
287     *
288     *The <code>skip</code> method skips over some smaller number of bytes
289     * when reaching end of file before <code>n</code> bytes have been skipped.
290     * The actual number of bytes skipped is returned.  If <code>n</code> is
291     * negative, no bytes are skipped.
292     *
293     * @param      n   the number of bytes to be skipped.
294     * @return     the actual number of bytes skipped.
295     * @exception  IOException  if an I/O error occurs.
296     *             ChecksumException if the chunk to skip to is corrupted
297     */
298    @Override
299    public synchronized long skip(long n) throws IOException {
300      long curPos = getPos();
301      long fileLength = getFileLength();
302      if( n+curPos > fileLength ) {
303        n = fileLength - curPos;
304      }
305      return super.skip(n);
306    }
307    
308    /**
309     * Seek to the given position in the stream.
310     * The next read() will be from that position.
311     * 
312     * <p>This method does not allow seek past the end of the file.
313     * This produces IOException.
314     *
315     * @param      pos   the postion to seek to.
316     * @exception  IOException  if an I/O error occurs or seeks after EOF
317     *             ChecksumException if the chunk to seek to is corrupted
318     */
319
320    @Override
321    public synchronized void seek(long pos) throws IOException {
322      if (pos > getFileLength()) {
323        throw new EOFException("Cannot seek after EOF");
324      }
325      super.seek(pos);
326    }
327
328  }
329
330  /**
331   * Opens an FSDataInputStream at the indicated Path.
332   * @param f the file name to open
333   * @param bufferSize the size of the buffer to be used.
334   */
335  @Override
336  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
337    FileSystem fs;
338    InputStream in;
339    if (verifyChecksum) {
340      fs = this;
341      in = new ChecksumFSInputChecker(this, f, bufferSize);
342    } else {
343      fs = getRawFileSystem();
344      in = fs.open(f, bufferSize);
345    }
346    return new FSDataBoundedInputStream(fs, f, in);
347  }
348
349  @Override
350  public FSDataOutputStream append(Path f, int bufferSize,
351      Progressable progress) throws IOException {
352    throw new IOException("Not supported");
353  }
354
355  @Override
356  public boolean truncate(Path f, long newLength) throws IOException {
357    throw new IOException("Not supported");
358  }
359
360  /**
361   * Calculated the length of the checksum file in bytes.
362   * @param size the length of the data file in bytes
363   * @param bytesPerSum the number of bytes in a checksum block
364   * @return the number of bytes in the checksum file
365   */
366  public static long getChecksumLength(long size, int bytesPerSum) {
367    //the checksum length is equal to size passed divided by bytesPerSum +
368    //bytes written in the beginning of the checksum file.  
369    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
370             CHECKSUM_VERSION.length + 4;  
371  }
372
373  /** This class provides an output stream for a checksummed file.
374   * It generates checksums for data. */
375  private static class ChecksumFSOutputSummer extends FSOutputSummer {
376    private FSDataOutputStream datas;    
377    private FSDataOutputStream sums;
378    private static final float CHKSUM_AS_FRACTION = 0.01f;
379    private boolean isClosed = false;
380    
381    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
382                          Path file, 
383                          boolean overwrite,
384                          int bufferSize,
385                          short replication,
386                          long blockSize,
387                          Progressable progress,
388                          FsPermission permission)
389      throws IOException {
390      super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
391          fs.getBytesPerSum()));
392      int bytesPerSum = fs.getBytesPerSum();
393      this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
394                                         bufferSize, replication, blockSize,
395                                         progress);
396      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
397      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
398                                               permission, true, sumBufferSize,
399                                               replication, blockSize, null);
400      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
401      sums.writeInt(bytesPerSum);
402    }
403    
404    @Override
405    public void close() throws IOException {
406      try {
407        flushBuffer();
408        sums.close();
409        datas.close();
410      } finally {
411        isClosed = true;
412      }
413    }
414    
415    @Override
416    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
417        int ckoff, int cklen)
418    throws IOException {
419      datas.write(b, offset, len);
420      sums.write(checksum, ckoff, cklen);
421    }
422
423    @Override
424    protected void checkClosed() throws IOException {
425      if (isClosed) {
426        throw new ClosedChannelException();
427      }
428    }
429  }
430
431  @Override
432  public FSDataOutputStream create(Path f, FsPermission permission,
433      boolean overwrite, int bufferSize, short replication, long blockSize,
434      Progressable progress) throws IOException {
435    return create(f, permission, overwrite, true, bufferSize,
436        replication, blockSize, progress);
437  }
438
439  private FSDataOutputStream create(Path f, FsPermission permission,
440      boolean overwrite, boolean createParent, int bufferSize,
441      short replication, long blockSize,
442      Progressable progress) throws IOException {
443    Path parent = f.getParent();
444    if (parent != null) {
445      if (!createParent && !exists(parent)) {
446        throw new FileNotFoundException("Parent directory doesn't exist: "
447            + parent);
448      } else if (!mkdirs(parent)) {
449        throw new IOException("Mkdirs failed to create " + parent
450            + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
451            + ")");
452      }
453    }
454    final FSDataOutputStream out;
455    if (writeChecksum) {
456      out = new FSDataOutputStream(
457          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
458              blockSize, progress, permission), null);
459    } else {
460      out = fs.create(f, permission, overwrite, bufferSize, replication,
461          blockSize, progress);
462      // remove the checksum file since we aren't writing one
463      Path checkFile = getChecksumFile(f);
464      if (fs.exists(checkFile)) {
465        fs.delete(checkFile, true);
466      }
467    }
468    return out;
469  }
470
471  @Override
472  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
473      boolean overwrite, int bufferSize, short replication, long blockSize,
474      Progressable progress) throws IOException {
475    return create(f, permission, overwrite, false, bufferSize, replication,
476        blockSize, progress);
477  }
478
479  /**
480   * Set replication for an existing file.
481   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
482   * @param src file name
483   * @param replication new replication
484   * @throws IOException
485   * @return true if successful;
486   *         false if file does not exist or is a directory
487   */
488  @Override
489  public boolean setReplication(Path src, short replication) throws IOException {
490    boolean value = fs.setReplication(src, replication);
491    if (!value)
492      return false;
493
494    Path checkFile = getChecksumFile(src);
495    if (exists(checkFile))
496      fs.setReplication(checkFile, replication);
497
498    return true;
499  }
500
501  /**
502   * Rename files/dirs
503   */
504  @Override
505  public boolean rename(Path src, Path dst) throws IOException {
506    if (fs.isDirectory(src)) {
507      return fs.rename(src, dst);
508    } else {
509      if (fs.isDirectory(dst)) {
510        dst = new Path(dst, src.getName());
511      }
512
513      boolean value = fs.rename(src, dst);
514      if (!value)
515        return false;
516
517      Path srcCheckFile = getChecksumFile(src);
518      Path dstCheckFile = getChecksumFile(dst);
519      if (fs.exists(srcCheckFile)) { //try to rename checksum
520        value = fs.rename(srcCheckFile, dstCheckFile);
521      } else if (fs.exists(dstCheckFile)) {
522        // no src checksum, so remove dst checksum
523        value = fs.delete(dstCheckFile, true); 
524      }
525
526      return value;
527    }
528  }
529
530  /**
531   * Implement the delete(Path, boolean) in checksum
532   * file system.
533   */
534  @Override
535  public boolean delete(Path f, boolean recursive) throws IOException{
536    FileStatus fstatus = null;
537    try {
538      fstatus = fs.getFileStatus(f);
539    } catch(FileNotFoundException e) {
540      return false;
541    }
542    if (fstatus.isDirectory()) {
543      //this works since the crcs are in the same
544      //directories and the files. so we just delete
545      //everything in the underlying filesystem
546      return fs.delete(f, recursive);
547    } else {
548      Path checkFile = getChecksumFile(f);
549      if (fs.exists(checkFile)) {
550        fs.delete(checkFile, true);
551      }
552      return fs.delete(f, true);
553    }
554  }
555    
556  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
557    @Override
558    public boolean accept(Path file) {
559      return !isChecksumFile(file);
560    }
561  };
562
563  /**
564   * List the statuses of the files/directories in the given path if the path is
565   * a directory.
566   * 
567   * @param f
568   *          given path
569   * @return the statuses of the files/directories in the given path
570   * @throws IOException
571   */
572  @Override
573  public FileStatus[] listStatus(Path f) throws IOException {
574    return fs.listStatus(f, DEFAULT_FILTER);
575  }
576  
577  /**
578   * List the statuses of the files/directories in the given path if the path is
579   * a directory.
580   * 
581   * @param f
582   *          given path
583   * @return the statuses of the files/directories in the given patch
584   * @throws IOException
585   */
586  @Override
587  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
588  throws IOException {
589    return fs.listLocatedStatus(f, DEFAULT_FILTER);
590  }
591  
592  @Override
593  public boolean mkdirs(Path f) throws IOException {
594    return fs.mkdirs(f);
595  }
596
597  @Override
598  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
599    throws IOException {
600    Configuration conf = getConf();
601    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
602  }
603
604  /**
605   * The src file is under FS, and the dst is on the local disk.
606   * Copy it from FS control to the local dst name.
607   */
608  @Override
609  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
610    throws IOException {
611    Configuration conf = getConf();
612    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
613  }
614
615  /**
616   * The src file is under FS, and the dst is on the local disk.
617   * Copy it from FS control to the local dst name.
618   * If src and dst are directories, the copyCrc parameter
619   * determines whether to copy CRC files.
620   */
621  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
622    throws IOException {
623    if (!fs.isDirectory(src)) { // source is a file
624      fs.copyToLocalFile(src, dst);
625      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
626      if (localFs.isDirectory(dst)) {
627        dst = new Path(dst, src.getName());
628      }
629      dst = getChecksumFile(dst);
630      if (localFs.exists(dst)) { //remove old local checksum file
631        localFs.delete(dst, true);
632      }
633      Path checksumFile = getChecksumFile(src);
634      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
635        fs.copyToLocalFile(checksumFile, dst);
636      }
637    } else {
638      FileStatus[] srcs = listStatus(src);
639      for (FileStatus srcFile : srcs) {
640        copyToLocalFile(srcFile.getPath(), 
641                        new Path(dst, srcFile.getPath().getName()), copyCrc);
642      }
643    }
644  }
645
646  @Override
647  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
648    throws IOException {
649    return tmpLocalFile;
650  }
651
652  @Override
653  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
654    throws IOException {
655    moveFromLocalFile(tmpLocalFile, fsOutputFile);
656  }
657
658  /**
659   * Report a checksum error to the file system.
660   * @param f the file name containing the error
661   * @param in the stream open on the file
662   * @param inPos the position of the beginning of the bad data in the file
663   * @param sums the stream open on the checksum file
664   * @param sumsPos the position of the beginning of the bad data in the checksum file
665   * @return if retry is neccessary
666   */
667  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
668                                       long inPos, FSDataInputStream sums, long sumsPos) {
669    return false;
670  }
671}