001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.*;
022import java.util.Arrays;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.permission.FsPermission;
030import org.apache.hadoop.util.Progressable;
031import org.apache.hadoop.util.PureJavaCrc32;
032
033/****************************************************************
034 * Abstract Checksumed FileSystem.
035 * It provide a basice implementation of a Checksumed FileSystem,
036 * which creates a checksum file for each raw file.
037 * It generates & verifies checksums at the client side.
038 *
039 *****************************************************************/
040@InterfaceAudience.Public
041@InterfaceStability.Stable
042public abstract class ChecksumFileSystem extends FilterFileSystem {
043  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
044  private int bytesPerChecksum = 512;
045  private boolean verifyChecksum = true;
046  private boolean writeChecksum = true;
047
048  public static double getApproxChkSumLength(long size) {
049    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
050  }
051  
052  public ChecksumFileSystem(FileSystem fs) {
053    super(fs);
054  }
055
056  public void setConf(Configuration conf) {
057    super.setConf(conf);
058    if (conf != null) {
059      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
060                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
061    }
062  }
063  
064  /**
065   * Set whether to verify checksum.
066   */
067  public void setVerifyChecksum(boolean verifyChecksum) {
068    this.verifyChecksum = verifyChecksum;
069  }
070
071  @Override
072  public void setWriteChecksum(boolean writeChecksum) {
073    this.writeChecksum = writeChecksum;
074  }
075  
076  /** get the raw file system */
077  public FileSystem getRawFileSystem() {
078    return fs;
079  }
080
081  /** Return the name of the checksum file associated with a file.*/
082  public Path getChecksumFile(Path file) {
083    return new Path(file.getParent(), "." + file.getName() + ".crc");
084  }
085
086  /** Return true iff file is a checksum file name.*/
087  public static boolean isChecksumFile(Path file) {
088    String name = file.getName();
089    return name.startsWith(".") && name.endsWith(".crc");
090  }
091
092  /** Return the length of the checksum file given the size of the 
093   * actual file.
094   **/
095  public long getChecksumFileLength(Path file, long fileSize) {
096    return getChecksumLength(fileSize, getBytesPerSum());
097  }
098
099  /** Return the bytes Per Checksum */
100  public int getBytesPerSum() {
101    return bytesPerChecksum;
102  }
103
104  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
105    int defaultBufferSize = getConf().getInt(
106                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
107                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
108    int proportionalBufferSize = bufferSize / bytesPerSum;
109    return Math.max(bytesPerSum,
110                    Math.max(proportionalBufferSize, defaultBufferSize));
111  }
112
113  /*******************************************************
114   * For open()'s FSInputStream
115   * It verifies that data matches checksums.
116   *******************************************************/
117  private static class ChecksumFSInputChecker extends FSInputChecker {
118    public static final Log LOG 
119      = LogFactory.getLog(FSInputChecker.class);
120    
121    private ChecksumFileSystem fs;
122    private FSDataInputStream datas;
123    private FSDataInputStream sums;
124    
125    private static final int HEADER_LENGTH = 8;
126    
127    private int bytesPerSum = 1;
128    
129    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
130      throws IOException {
131      this(fs, file, fs.getConf().getInt(
132                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
133                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
134    }
135    
136    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
137      throws IOException {
138      super( file, fs.getFileStatus(file).getReplication() );
139      this.datas = fs.getRawFileSystem().open(file, bufferSize);
140      this.fs = fs;
141      Path sumFile = fs.getChecksumFile(file);
142      try {
143        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
144        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
145
146        byte[] version = new byte[CHECKSUM_VERSION.length];
147        sums.readFully(version);
148        if (!Arrays.equals(version, CHECKSUM_VERSION))
149          throw new IOException("Not a checksum file: "+sumFile);
150        this.bytesPerSum = sums.readInt();
151        set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
152      } catch (FileNotFoundException e) {         // quietly ignore
153        set(fs.verifyChecksum, null, 1, 0);
154      } catch (IOException e) {                   // loudly ignore
155        LOG.warn("Problem opening checksum file: "+ file + 
156                 ".  Ignoring exception: " , e); 
157        set(fs.verifyChecksum, null, 1, 0);
158      }
159    }
160    
161    private long getChecksumFilePos( long dataPos ) {
162      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
163    }
164    
165    protected long getChunkPosition( long dataPos ) {
166      return dataPos/bytesPerSum*bytesPerSum;
167    }
168    
169    public int available() throws IOException {
170      return datas.available() + super.available();
171    }
172    
173    public int read(long position, byte[] b, int off, int len)
174      throws IOException {
175      // parameter check
176      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
177        throw new IndexOutOfBoundsException();
178      } else if (len == 0) {
179        return 0;
180      }
181      if( position<0 ) {
182        throw new IllegalArgumentException(
183            "Parameter position can not to be negative");
184      }
185
186      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
187      checker.seek(position);
188      int nread = checker.read(b, off, len);
189      checker.close();
190      return nread;
191    }
192    
193    public void close() throws IOException {
194      datas.close();
195      if( sums != null ) {
196        sums.close();
197      }
198      set(fs.verifyChecksum, null, 1, 0);
199    }
200    
201
202    @Override
203    public boolean seekToNewSource(long targetPos) throws IOException {
204      long sumsPos = getChecksumFilePos(targetPos);
205      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
206      boolean newDataSource = datas.seekToNewSource(targetPos);
207      return sums.seekToNewSource(sumsPos) || newDataSource;
208    }
209
210    @Override
211    protected int readChunk(long pos, byte[] buf, int offset, int len,
212        byte[] checksum) throws IOException {
213
214      boolean eof = false;
215      if (needChecksum()) {
216        assert checksum != null; // we have a checksum buffer
217        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
218        assert len >= bytesPerSum; // we must read at least one chunk
219
220        final int checksumsToRead = Math.min(
221          len/bytesPerSum, // number of checksums based on len to read
222          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
223        long checksumPos = getChecksumFilePos(pos); 
224        if(checksumPos != sums.getPos()) {
225          sums.seek(checksumPos);
226        }
227
228        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
229        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
230          throw new ChecksumException(
231            "Checksum file not a length multiple of checksum size " +
232            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
233            " sumLenread: " + sumLenRead,
234            pos);
235        }
236        if (sumLenRead <= 0) { // we're at the end of the file
237          eof = true;
238        } else {
239          // Adjust amount of data to read based on how many checksum chunks we read
240          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
241        }
242      }
243      if(pos != datas.getPos()) {
244        datas.seek(pos);
245      }
246      int nread = readFully(datas, buf, offset, len);
247      if (eof && nread > 0) {
248        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
249      }
250      return nread;
251    }
252  }
253  
254  private static class FSDataBoundedInputStream extends FSDataInputStream {
255    private FileSystem fs;
256    private Path file;
257    private long fileLen = -1L;
258
259    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
260        throws IOException {
261      super(in);
262      this.fs = fs;
263      this.file = file;
264    }
265    
266    @Override
267    public boolean markSupported() {
268      return false;
269    }
270    
271    /* Return the file length */
272    private long getFileLength() throws IOException {
273      if( fileLen==-1L ) {
274        fileLen = fs.getContentSummary(file).getLength();
275      }
276      return fileLen;
277    }
278    
279    /**
280     * Skips over and discards <code>n</code> bytes of data from the
281     * input stream.
282     *
283     *The <code>skip</code> method skips over some smaller number of bytes
284     * when reaching end of file before <code>n</code> bytes have been skipped.
285     * The actual number of bytes skipped is returned.  If <code>n</code> is
286     * negative, no bytes are skipped.
287     *
288     * @param      n   the number of bytes to be skipped.
289     * @return     the actual number of bytes skipped.
290     * @exception  IOException  if an I/O error occurs.
291     *             ChecksumException if the chunk to skip to is corrupted
292     */
293    public synchronized long skip(long n) throws IOException {
294      long curPos = getPos();
295      long fileLength = getFileLength();
296      if( n+curPos > fileLength ) {
297        n = fileLength - curPos;
298      }
299      return super.skip(n);
300    }
301    
302    /**
303     * Seek to the given position in the stream.
304     * The next read() will be from that position.
305     * 
306     * <p>This method does not allow seek past the end of the file.
307     * This produces IOException.
308     *
309     * @param      pos   the postion to seek to.
310     * @exception  IOException  if an I/O error occurs or seeks after EOF
311     *             ChecksumException if the chunk to seek to is corrupted
312     */
313
314    public synchronized void seek(long pos) throws IOException {
315      if(pos>getFileLength()) {
316        throw new IOException("Cannot seek after EOF");
317      }
318      super.seek(pos);
319    }
320
321  }
322
323  /**
324   * Opens an FSDataInputStream at the indicated Path.
325   * @param f the file name to open
326   * @param bufferSize the size of the buffer to be used.
327   */
328  @Override
329  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
330    FileSystem fs;
331    InputStream in;
332    if (verifyChecksum) {
333      fs = this;
334      in = new ChecksumFSInputChecker(this, f, bufferSize);
335    } else {
336      fs = getRawFileSystem();
337      in = fs.open(f, bufferSize);
338    }
339    return new FSDataBoundedInputStream(fs, f, in);
340  }
341
342  /** {@inheritDoc} */
343  public FSDataOutputStream append(Path f, int bufferSize,
344      Progressable progress) throws IOException {
345    throw new IOException("Not supported");
346  }
347
348  /**
349   * Calculated the length of the checksum file in bytes.
350   * @param size the length of the data file in bytes
351   * @param bytesPerSum the number of bytes in a checksum block
352   * @return the number of bytes in the checksum file
353   */
354  public static long getChecksumLength(long size, int bytesPerSum) {
355    //the checksum length is equal to size passed divided by bytesPerSum +
356    //bytes written in the beginning of the checksum file.  
357    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
358             CHECKSUM_VERSION.length + 4;  
359  }
360
361  /** This class provides an output stream for a checksummed file.
362   * It generates checksums for data. */
363  private static class ChecksumFSOutputSummer extends FSOutputSummer {
364    private FSDataOutputStream datas;    
365    private FSDataOutputStream sums;
366    private static final float CHKSUM_AS_FRACTION = 0.01f;
367    
368    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
369                          Path file, 
370                          boolean overwrite, 
371                          short replication,
372                          long blockSize,
373                          Configuration conf)
374      throws IOException {
375      this(fs, file, overwrite, 
376           conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
377                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT),
378           replication, blockSize, null);
379    }
380    
381    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
382                          Path file, 
383                          boolean overwrite,
384                          int bufferSize,
385                          short replication,
386                          long blockSize,
387                          Progressable progress)
388      throws IOException {
389      super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
390      int bytesPerSum = fs.getBytesPerSum();
391      this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
392                                         replication, blockSize, progress);
393      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
394      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
395                                               sumBufferSize, replication,
396                                               blockSize);
397      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
398      sums.writeInt(bytesPerSum);
399    }
400    
401    public void close() throws IOException {
402      flushBuffer();
403      sums.close();
404      datas.close();
405    }
406    
407    @Override
408    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
409    throws IOException {
410      datas.write(b, offset, len);
411      sums.write(checksum);
412    }
413  }
414
415  /** {@inheritDoc} */
416  @Override
417  public FSDataOutputStream create(Path f, FsPermission permission,
418      boolean overwrite, int bufferSize, short replication, long blockSize,
419      Progressable progress) throws IOException {
420    return create(f, permission, overwrite, true, bufferSize,
421        replication, blockSize, progress);
422  }
423
424  private FSDataOutputStream create(Path f, FsPermission permission,
425      boolean overwrite, boolean createParent, int bufferSize,
426      short replication, long blockSize,
427      Progressable progress) throws IOException {
428    Path parent = f.getParent();
429    if (parent != null) {
430      if (!createParent && !exists(parent)) {
431        throw new FileNotFoundException("Parent directory doesn't exist: "
432            + parent);
433      } else if (!mkdirs(parent)) {
434        throw new IOException("Mkdirs failed to create " + parent);
435      }
436    }
437    final FSDataOutputStream out;
438    if (writeChecksum) {
439      out = new FSDataOutputStream(
440          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
441              blockSize, progress), null);
442    } else {
443      out = fs.create(f, permission, overwrite, bufferSize, replication,
444          blockSize, progress);
445      // remove the checksum file since we aren't writing one
446      Path checkFile = getChecksumFile(f);
447      if (fs.exists(checkFile)) {
448        fs.delete(checkFile, true);
449      }
450    }
451    if (permission != null) {
452      setPermission(f, permission);
453    }
454    return out;
455  }
456
457  /** {@inheritDoc} */
458  @Override
459  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
460      boolean overwrite, int bufferSize, short replication, long blockSize,
461      Progressable progress) throws IOException {
462    return create(f, permission, overwrite, false, bufferSize, replication,
463        blockSize, progress);
464  }
465
466  /**
467   * Set replication for an existing file.
468   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
469   * @param src file name
470   * @param replication new replication
471   * @throws IOException
472   * @return true if successful;
473   *         false if file does not exist or is a directory
474   */
475  public boolean setReplication(Path src, short replication) throws IOException {
476    boolean value = fs.setReplication(src, replication);
477    if (!value)
478      return false;
479
480    Path checkFile = getChecksumFile(src);
481    if (exists(checkFile))
482      fs.setReplication(checkFile, replication);
483
484    return true;
485  }
486
487  /**
488   * Rename files/dirs
489   */
490  public boolean rename(Path src, Path dst) throws IOException {
491    if (fs.isDirectory(src)) {
492      return fs.rename(src, dst);
493    } else {
494      if (fs.isDirectory(dst)) {
495        dst = new Path(dst, src.getName());
496      }
497
498      boolean value = fs.rename(src, dst);
499      if (!value)
500        return false;
501
502      Path srcCheckFile = getChecksumFile(src);
503      Path dstCheckFile = getChecksumFile(dst);
504      if (fs.exists(srcCheckFile)) { //try to rename checksum
505        value = fs.rename(srcCheckFile, dstCheckFile);
506      } else if (fs.exists(dstCheckFile)) {
507        // no src checksum, so remove dst checksum
508        value = fs.delete(dstCheckFile, true); 
509      }
510
511      return value;
512    }
513  }
514
515  /**
516   * Implement the delete(Path, boolean) in checksum
517   * file system.
518   */
519  public boolean delete(Path f, boolean recursive) throws IOException{
520    FileStatus fstatus = null;
521    try {
522      fstatus = fs.getFileStatus(f);
523    } catch(FileNotFoundException e) {
524      return false;
525    }
526    if (fstatus.isDirectory()) {
527      //this works since the crcs are in the same
528      //directories and the files. so we just delete
529      //everything in the underlying filesystem
530      return fs.delete(f, recursive);
531    } else {
532      Path checkFile = getChecksumFile(f);
533      if (fs.exists(checkFile)) {
534        fs.delete(checkFile, true);
535      }
536      return fs.delete(f, true);
537    }
538  }
539    
540  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
541    public boolean accept(Path file) {
542      return !isChecksumFile(file);
543    }
544  };
545
546  /**
547   * List the statuses of the files/directories in the given path if the path is
548   * a directory.
549   * 
550   * @param f
551   *          given path
552   * @return the statuses of the files/directories in the given patch
553   * @throws IOException
554   */
555  @Override
556  public FileStatus[] listStatus(Path f) throws IOException {
557    return fs.listStatus(f, DEFAULT_FILTER);
558  }
559  
560  /**
561   * List the statuses of the files/directories in the given path if the path is
562   * a directory.
563   * 
564   * @param f
565   *          given path
566   * @return the statuses of the files/directories in the given patch
567   * @throws IOException
568   */
569  @Override
570  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
571  throws IOException {
572    return fs.listLocatedStatus(f, DEFAULT_FILTER);
573  }
574  
575  @Override
576  public boolean mkdirs(Path f) throws IOException {
577    return fs.mkdirs(f);
578  }
579
580  @Override
581  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
582    throws IOException {
583    Configuration conf = getConf();
584    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
585  }
586
587  /**
588   * The src file is under FS, and the dst is on the local disk.
589   * Copy it from FS control to the local dst name.
590   */
591  @Override
592  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
593    throws IOException {
594    Configuration conf = getConf();
595    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
596  }
597
598  /**
599   * The src file is under FS, and the dst is on the local disk.
600   * Copy it from FS control to the local dst name.
601   * If src and dst are directories, the copyCrc parameter
602   * determines whether to copy CRC files.
603   */
604  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
605    throws IOException {
606    if (!fs.isDirectory(src)) { // source is a file
607      fs.copyToLocalFile(src, dst);
608      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
609      if (localFs.isDirectory(dst)) {
610        dst = new Path(dst, src.getName());
611      }
612      dst = getChecksumFile(dst);
613      if (localFs.exists(dst)) { //remove old local checksum file
614        localFs.delete(dst, true);
615      }
616      Path checksumFile = getChecksumFile(src);
617      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
618        fs.copyToLocalFile(checksumFile, dst);
619      }
620    } else {
621      FileStatus[] srcs = listStatus(src);
622      for (FileStatus srcFile : srcs) {
623        copyToLocalFile(srcFile.getPath(), 
624                        new Path(dst, srcFile.getPath().getName()), copyCrc);
625      }
626    }
627  }
628
629  @Override
630  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
631    throws IOException {
632    return tmpLocalFile;
633  }
634
635  @Override
636  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
637    throws IOException {
638    moveFromLocalFile(tmpLocalFile, fsOutputFile);
639  }
640
641  /**
642   * Report a checksum error to the file system.
643   * @param f the file name containing the error
644   * @param in the stream open on the file
645   * @param inPos the position of the beginning of the bad data in the file
646   * @param sums the stream open on the checksum file
647   * @param sumsPos the position of the beginning of the bad data in the checksum file
648   * @return if retry is neccessary
649   */
650  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
651                                       long inPos, FSDataInputStream sums, long sumsPos) {
652    return false;
653  }
654}