001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.EOFException;
022    import java.io.FileNotFoundException;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.nio.channels.ClosedChannelException;
026    import java.util.Arrays;
027    
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.permission.FsPermission;
032    import org.apache.hadoop.util.DataChecksum;
033    import org.apache.hadoop.util.Progressable;
034    
035    /****************************************************************
036     * Abstract Checksumed FileSystem.
037     * It provide a basic implementation of a Checksumed FileSystem,
038     * which creates a checksum file for each raw file.
039     * It generates & verifies checksums at the client side.
040     *
041     *****************************************************************/
042    @InterfaceAudience.Public
043    @InterfaceStability.Stable
044    public abstract class ChecksumFileSystem extends FilterFileSystem {
045      private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
046      private int bytesPerChecksum = 512;
047      private boolean verifyChecksum = true;
048      private boolean writeChecksum = true;
049    
050      public static double getApproxChkSumLength(long size) {
051        return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
052      }
053      
054      public ChecksumFileSystem(FileSystem fs) {
055        super(fs);
056      }
057    
058      @Override
059      public void setConf(Configuration conf) {
060        super.setConf(conf);
061        if (conf != null) {
062          bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
063                                         LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
064        }
065      }
066      
067      /**
068       * Set whether to verify checksum.
069       */
070      @Override
071      public void setVerifyChecksum(boolean verifyChecksum) {
072        this.verifyChecksum = verifyChecksum;
073      }
074    
075      @Override
076      public void setWriteChecksum(boolean writeChecksum) {
077        this.writeChecksum = writeChecksum;
078      }
079      
080      /** get the raw file system */
081      @Override
082      public FileSystem getRawFileSystem() {
083        return fs;
084      }
085    
086      /** Return the name of the checksum file associated with a file.*/
087      public Path getChecksumFile(Path file) {
088        return new Path(file.getParent(), "." + file.getName() + ".crc");
089      }
090    
091      /** Return true iff file is a checksum file name.*/
092      public static boolean isChecksumFile(Path file) {
093        String name = file.getName();
094        return name.startsWith(".") && name.endsWith(".crc");
095      }
096    
097      /** Return the length of the checksum file given the size of the 
098       * actual file.
099       **/
100      public long getChecksumFileLength(Path file, long fileSize) {
101        return getChecksumLength(fileSize, getBytesPerSum());
102      }
103    
104      /** Return the bytes Per Checksum */
105      public int getBytesPerSum() {
106        return bytesPerChecksum;
107      }
108    
109      private int getSumBufferSize(int bytesPerSum, int bufferSize) {
110        int defaultBufferSize = getConf().getInt(
111                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
112                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
113        int proportionalBufferSize = bufferSize / bytesPerSum;
114        return Math.max(bytesPerSum,
115                        Math.max(proportionalBufferSize, defaultBufferSize));
116      }
117    
118      /*******************************************************
119       * For open()'s FSInputStream
120       * It verifies that data matches checksums.
121       *******************************************************/
122      private static class ChecksumFSInputChecker extends FSInputChecker {
123        private ChecksumFileSystem fs;
124        private FSDataInputStream datas;
125        private FSDataInputStream sums;
126        
127        private static final int HEADER_LENGTH = 8;
128        
129        private int bytesPerSum = 1;
130        
131        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
132          throws IOException {
133          this(fs, file, fs.getConf().getInt(
134                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
135                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
136        }
137        
138        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
139          throws IOException {
140          super( file, fs.getFileStatus(file).getReplication() );
141          this.datas = fs.getRawFileSystem().open(file, bufferSize);
142          this.fs = fs;
143          Path sumFile = fs.getChecksumFile(file);
144          try {
145            int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
146            sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
147    
148            byte[] version = new byte[CHECKSUM_VERSION.length];
149            sums.readFully(version);
150            if (!Arrays.equals(version, CHECKSUM_VERSION))
151              throw new IOException("Not a checksum file: "+sumFile);
152            this.bytesPerSum = sums.readInt();
153            set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
154          } catch (FileNotFoundException e) {         // quietly ignore
155            set(fs.verifyChecksum, null, 1, 0);
156          } catch (IOException e) {                   // loudly ignore
157            LOG.warn("Problem opening checksum file: "+ file + 
158                     ".  Ignoring exception: " , e); 
159            set(fs.verifyChecksum, null, 1, 0);
160          }
161        }
162        
163        private long getChecksumFilePos( long dataPos ) {
164          return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
165        }
166        
167        @Override
168        protected long getChunkPosition( long dataPos ) {
169          return dataPos/bytesPerSum*bytesPerSum;
170        }
171        
172        @Override
173        public int available() throws IOException {
174          return datas.available() + super.available();
175        }
176        
177        @Override
178        public int read(long position, byte[] b, int off, int len)
179          throws IOException {
180          // parameter check
181          if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
182            throw new IndexOutOfBoundsException();
183          } else if (len == 0) {
184            return 0;
185          }
186          if( position<0 ) {
187            throw new IllegalArgumentException(
188                "Parameter position can not to be negative");
189          }
190    
191          ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
192          checker.seek(position);
193          int nread = checker.read(b, off, len);
194          checker.close();
195          return nread;
196        }
197        
198        @Override
199        public void close() throws IOException {
200          datas.close();
201          if( sums != null ) {
202            sums.close();
203          }
204          set(fs.verifyChecksum, null, 1, 0);
205        }
206        
207    
208        @Override
209        public boolean seekToNewSource(long targetPos) throws IOException {
210          long sumsPos = getChecksumFilePos(targetPos);
211          fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
212          boolean newDataSource = datas.seekToNewSource(targetPos);
213          return sums.seekToNewSource(sumsPos) || newDataSource;
214        }
215    
216        @Override
217        protected int readChunk(long pos, byte[] buf, int offset, int len,
218            byte[] checksum) throws IOException {
219    
220          boolean eof = false;
221          if (needChecksum()) {
222            assert checksum != null; // we have a checksum buffer
223            assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
224            assert len >= bytesPerSum; // we must read at least one chunk
225    
226            final int checksumsToRead = Math.min(
227              len/bytesPerSum, // number of checksums based on len to read
228              checksum.length / CHECKSUM_SIZE); // size of checksum buffer
229            long checksumPos = getChecksumFilePos(pos); 
230            if(checksumPos != sums.getPos()) {
231              sums.seek(checksumPos);
232            }
233    
234            int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
235            if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
236              throw new ChecksumException(
237                "Checksum file not a length multiple of checksum size " +
238                "in " + file + " at " + pos + " checksumpos: " + checksumPos +
239                " sumLenread: " + sumLenRead,
240                pos);
241            }
242            if (sumLenRead <= 0) { // we're at the end of the file
243              eof = true;
244            } else {
245              // Adjust amount of data to read based on how many checksum chunks we read
246              len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
247            }
248          }
249          if(pos != datas.getPos()) {
250            datas.seek(pos);
251          }
252          int nread = readFully(datas, buf, offset, len);
253          if (eof && nread > 0) {
254            throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
255          }
256          return nread;
257        }
258      }
259      
260      private static class FSDataBoundedInputStream extends FSDataInputStream {
261        private FileSystem fs;
262        private Path file;
263        private long fileLen = -1L;
264    
265        FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
266          super(in);
267          this.fs = fs;
268          this.file = file;
269        }
270        
271        @Override
272        public boolean markSupported() {
273          return false;
274        }
275        
276        /* Return the file length */
277        private long getFileLength() throws IOException {
278          if( fileLen==-1L ) {
279            fileLen = fs.getContentSummary(file).getLength();
280          }
281          return fileLen;
282        }
283        
284        /**
285         * Skips over and discards <code>n</code> bytes of data from the
286         * input stream.
287         *
288         *The <code>skip</code> method skips over some smaller number of bytes
289         * when reaching end of file before <code>n</code> bytes have been skipped.
290         * The actual number of bytes skipped is returned.  If <code>n</code> is
291         * negative, no bytes are skipped.
292         *
293         * @param      n   the number of bytes to be skipped.
294         * @return     the actual number of bytes skipped.
295         * @exception  IOException  if an I/O error occurs.
296         *             ChecksumException if the chunk to skip to is corrupted
297         */
298        @Override
299        public synchronized long skip(long n) throws IOException {
300          long curPos = getPos();
301          long fileLength = getFileLength();
302          if( n+curPos > fileLength ) {
303            n = fileLength - curPos;
304          }
305          return super.skip(n);
306        }
307        
308        /**
309         * Seek to the given position in the stream.
310         * The next read() will be from that position.
311         * 
312         * <p>This method does not allow seek past the end of the file.
313         * This produces IOException.
314         *
315         * @param      pos   the postion to seek to.
316         * @exception  IOException  if an I/O error occurs or seeks after EOF
317         *             ChecksumException if the chunk to seek to is corrupted
318         */
319    
320        @Override
321        public synchronized void seek(long pos) throws IOException {
322          if (pos > getFileLength()) {
323            throw new EOFException("Cannot seek after EOF");
324          }
325          super.seek(pos);
326        }
327    
328      }
329    
330      /**
331       * Opens an FSDataInputStream at the indicated Path.
332       * @param f the file name to open
333       * @param bufferSize the size of the buffer to be used.
334       */
335      @Override
336      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
337        FileSystem fs;
338        InputStream in;
339        if (verifyChecksum) {
340          fs = this;
341          in = new ChecksumFSInputChecker(this, f, bufferSize);
342        } else {
343          fs = getRawFileSystem();
344          in = fs.open(f, bufferSize);
345        }
346        return new FSDataBoundedInputStream(fs, f, in);
347      }
348    
349      @Override
350      public FSDataOutputStream append(Path f, int bufferSize,
351          Progressable progress) throws IOException {
352        throw new IOException("Not supported");
353      }
354    
355      /**
356       * Calculated the length of the checksum file in bytes.
357       * @param size the length of the data file in bytes
358       * @param bytesPerSum the number of bytes in a checksum block
359       * @return the number of bytes in the checksum file
360       */
361      public static long getChecksumLength(long size, int bytesPerSum) {
362        //the checksum length is equal to size passed divided by bytesPerSum +
363        //bytes written in the beginning of the checksum file.  
364        return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
365                 CHECKSUM_VERSION.length + 4;  
366      }
367    
368      /** This class provides an output stream for a checksummed file.
369       * It generates checksums for data. */
370      private static class ChecksumFSOutputSummer extends FSOutputSummer {
371        private FSDataOutputStream datas;    
372        private FSDataOutputStream sums;
373        private static final float CHKSUM_AS_FRACTION = 0.01f;
374        private boolean isClosed = false;
375        
376        public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
377                              Path file, 
378                              boolean overwrite,
379                              int bufferSize,
380                              short replication,
381                              long blockSize,
382                              Progressable progress)
383          throws IOException {
384          super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
385              fs.getBytesPerSum()));
386          int bytesPerSum = fs.getBytesPerSum();
387          this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
388                                             replication, blockSize, progress);
389          int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
390          this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
391                                                   sumBufferSize, replication,
392                                                   blockSize);
393          sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
394          sums.writeInt(bytesPerSum);
395        }
396        
397        @Override
398        public void close() throws IOException {
399          try {
400            flushBuffer();
401            sums.close();
402            datas.close();
403          } finally {
404            isClosed = true;
405          }
406        }
407        
408        @Override
409        protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
410            int ckoff, int cklen)
411        throws IOException {
412          datas.write(b, offset, len);
413          sums.write(checksum, ckoff, cklen);
414        }
415    
416        @Override
417        protected void checkClosed() throws IOException {
418          if (isClosed) {
419            throw new ClosedChannelException();
420          }
421        }
422      }
423    
424      @Override
425      public FSDataOutputStream create(Path f, FsPermission permission,
426          boolean overwrite, int bufferSize, short replication, long blockSize,
427          Progressable progress) throws IOException {
428        return create(f, permission, overwrite, true, bufferSize,
429            replication, blockSize, progress);
430      }
431    
432      private FSDataOutputStream create(Path f, FsPermission permission,
433          boolean overwrite, boolean createParent, int bufferSize,
434          short replication, long blockSize,
435          Progressable progress) throws IOException {
436        Path parent = f.getParent();
437        if (parent != null) {
438          if (!createParent && !exists(parent)) {
439            throw new FileNotFoundException("Parent directory doesn't exist: "
440                + parent);
441          } else if (!mkdirs(parent)) {
442            throw new IOException("Mkdirs failed to create " + parent
443                + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
444                + ")");
445          }
446        }
447        final FSDataOutputStream out;
448        if (writeChecksum) {
449          out = new FSDataOutputStream(
450              new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
451                  blockSize, progress), null);
452        } else {
453          out = fs.create(f, permission, overwrite, bufferSize, replication,
454              blockSize, progress);
455          // remove the checksum file since we aren't writing one
456          Path checkFile = getChecksumFile(f);
457          if (fs.exists(checkFile)) {
458            fs.delete(checkFile, true);
459          }
460        }
461        if (permission != null) {
462          setPermission(f, permission);
463        }
464        return out;
465      }
466    
467      @Override
468      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
469          boolean overwrite, int bufferSize, short replication, long blockSize,
470          Progressable progress) throws IOException {
471        return create(f, permission, overwrite, false, bufferSize, replication,
472            blockSize, progress);
473      }
474    
475      /**
476       * Set replication for an existing file.
477       * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
478       * @param src file name
479       * @param replication new replication
480       * @throws IOException
481       * @return true if successful;
482       *         false if file does not exist or is a directory
483       */
484      @Override
485      public boolean setReplication(Path src, short replication) throws IOException {
486        boolean value = fs.setReplication(src, replication);
487        if (!value)
488          return false;
489    
490        Path checkFile = getChecksumFile(src);
491        if (exists(checkFile))
492          fs.setReplication(checkFile, replication);
493    
494        return true;
495      }
496    
497      /**
498       * Rename files/dirs
499       */
500      @Override
501      public boolean rename(Path src, Path dst) throws IOException {
502        if (fs.isDirectory(src)) {
503          return fs.rename(src, dst);
504        } else {
505          if (fs.isDirectory(dst)) {
506            dst = new Path(dst, src.getName());
507          }
508    
509          boolean value = fs.rename(src, dst);
510          if (!value)
511            return false;
512    
513          Path srcCheckFile = getChecksumFile(src);
514          Path dstCheckFile = getChecksumFile(dst);
515          if (fs.exists(srcCheckFile)) { //try to rename checksum
516            value = fs.rename(srcCheckFile, dstCheckFile);
517          } else if (fs.exists(dstCheckFile)) {
518            // no src checksum, so remove dst checksum
519            value = fs.delete(dstCheckFile, true); 
520          }
521    
522          return value;
523        }
524      }
525    
526      /**
527       * Implement the delete(Path, boolean) in checksum
528       * file system.
529       */
530      @Override
531      public boolean delete(Path f, boolean recursive) throws IOException{
532        FileStatus fstatus = null;
533        try {
534          fstatus = fs.getFileStatus(f);
535        } catch(FileNotFoundException e) {
536          return false;
537        }
538        if (fstatus.isDirectory()) {
539          //this works since the crcs are in the same
540          //directories and the files. so we just delete
541          //everything in the underlying filesystem
542          return fs.delete(f, recursive);
543        } else {
544          Path checkFile = getChecksumFile(f);
545          if (fs.exists(checkFile)) {
546            fs.delete(checkFile, true);
547          }
548          return fs.delete(f, true);
549        }
550      }
551        
552      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
553        @Override
554        public boolean accept(Path file) {
555          return !isChecksumFile(file);
556        }
557      };
558    
559      /**
560       * List the statuses of the files/directories in the given path if the path is
561       * a directory.
562       * 
563       * @param f
564       *          given path
565       * @return the statuses of the files/directories in the given path
566       * @throws IOException
567       */
568      @Override
569      public FileStatus[] listStatus(Path f) throws IOException {
570        return fs.listStatus(f, DEFAULT_FILTER);
571      }
572      
573      /**
574       * List the statuses of the files/directories in the given path if the path is
575       * a directory.
576       * 
577       * @param f
578       *          given path
579       * @return the statuses of the files/directories in the given patch
580       * @throws IOException
581       */
582      @Override
583      public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
584      throws IOException {
585        return fs.listLocatedStatus(f, DEFAULT_FILTER);
586      }
587      
588      @Override
589      public boolean mkdirs(Path f) throws IOException {
590        return fs.mkdirs(f);
591      }
592    
593      @Override
594      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
595        throws IOException {
596        Configuration conf = getConf();
597        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
598      }
599    
600      /**
601       * The src file is under FS, and the dst is on the local disk.
602       * Copy it from FS control to the local dst name.
603       */
604      @Override
605      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
606        throws IOException {
607        Configuration conf = getConf();
608        FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
609      }
610    
611      /**
612       * The src file is under FS, and the dst is on the local disk.
613       * Copy it from FS control to the local dst name.
614       * If src and dst are directories, the copyCrc parameter
615       * determines whether to copy CRC files.
616       */
617      public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
618        throws IOException {
619        if (!fs.isDirectory(src)) { // source is a file
620          fs.copyToLocalFile(src, dst);
621          FileSystem localFs = getLocal(getConf()).getRawFileSystem();
622          if (localFs.isDirectory(dst)) {
623            dst = new Path(dst, src.getName());
624          }
625          dst = getChecksumFile(dst);
626          if (localFs.exists(dst)) { //remove old local checksum file
627            localFs.delete(dst, true);
628          }
629          Path checksumFile = getChecksumFile(src);
630          if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
631            fs.copyToLocalFile(checksumFile, dst);
632          }
633        } else {
634          FileStatus[] srcs = listStatus(src);
635          for (FileStatus srcFile : srcs) {
636            copyToLocalFile(srcFile.getPath(), 
637                            new Path(dst, srcFile.getPath().getName()), copyCrc);
638          }
639        }
640      }
641    
642      @Override
643      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
644        throws IOException {
645        return tmpLocalFile;
646      }
647    
648      @Override
649      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
650        throws IOException {
651        moveFromLocalFile(tmpLocalFile, fsOutputFile);
652      }
653    
654      /**
655       * Report a checksum error to the file system.
656       * @param f the file name containing the error
657       * @param in the stream open on the file
658       * @param inPos the position of the beginning of the bad data in the file
659       * @param sums the stream open on the checksum file
660       * @param sumsPos the position of the beginning of the bad data in the checksum file
661       * @return if retry is neccessary
662       */
663      public boolean reportChecksumFailure(Path f, FSDataInputStream in,
664                                           long inPos, FSDataInputStream sums, long sumsPos) {
665        return false;
666      }
667    }