001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.fs;
020    
021    import java.io.*;
022    import java.nio.channels.ClosedChannelException;
023    import java.util.Arrays;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.fs.permission.FsPermission;
029    import org.apache.hadoop.util.Progressable;
030    import org.apache.hadoop.util.PureJavaCrc32;
031    
032    /****************************************************************
033     * Abstract Checksumed FileSystem.
034     * It provide a basic implementation of a Checksumed FileSystem,
035     * which creates a checksum file for each raw file.
036     * It generates & verifies checksums at the client side.
037     *
038     *****************************************************************/
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public abstract class ChecksumFileSystem extends FilterFileSystem {
042      private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
043      private int bytesPerChecksum = 512;
044      private boolean verifyChecksum = true;
045      private boolean writeChecksum = true;
046    
047      public static double getApproxChkSumLength(long size) {
048        return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
049      }
050      
051      public ChecksumFileSystem(FileSystem fs) {
052        super(fs);
053      }
054    
055      @Override
056      public void setConf(Configuration conf) {
057        super.setConf(conf);
058        if (conf != null) {
059          bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
060                                         LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
061        }
062      }
063      
064      /**
065       * Set whether to verify checksum.
066       */
067      @Override
068      public void setVerifyChecksum(boolean verifyChecksum) {
069        this.verifyChecksum = verifyChecksum;
070      }
071    
072      @Override
073      public void setWriteChecksum(boolean writeChecksum) {
074        this.writeChecksum = writeChecksum;
075      }
076      
077      /** get the raw file system */
078      @Override
079      public FileSystem getRawFileSystem() {
080        return fs;
081      }
082    
083      /** Return the name of the checksum file associated with a file.*/
084      public Path getChecksumFile(Path file) {
085        return new Path(file.getParent(), "." + file.getName() + ".crc");
086      }
087    
088      /** Return true iff file is a checksum file name.*/
089      public static boolean isChecksumFile(Path file) {
090        String name = file.getName();
091        return name.startsWith(".") && name.endsWith(".crc");
092      }
093    
094      /** Return the length of the checksum file given the size of the 
095       * actual file.
096       **/
097      public long getChecksumFileLength(Path file, long fileSize) {
098        return getChecksumLength(fileSize, getBytesPerSum());
099      }
100    
101      /** Return the bytes Per Checksum */
102      public int getBytesPerSum() {
103        return bytesPerChecksum;
104      }
105    
106      private int getSumBufferSize(int bytesPerSum, int bufferSize) {
107        int defaultBufferSize = getConf().getInt(
108                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
109                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
110        int proportionalBufferSize = bufferSize / bytesPerSum;
111        return Math.max(bytesPerSum,
112                        Math.max(proportionalBufferSize, defaultBufferSize));
113      }
114    
115      /*******************************************************
116       * For open()'s FSInputStream
117       * It verifies that data matches checksums.
118       *******************************************************/
119      private static class ChecksumFSInputChecker extends FSInputChecker {
120        private ChecksumFileSystem fs;
121        private FSDataInputStream datas;
122        private FSDataInputStream sums;
123        
124        private static final int HEADER_LENGTH = 8;
125        
126        private int bytesPerSum = 1;
127        
128        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
129          throws IOException {
130          this(fs, file, fs.getConf().getInt(
131                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
132                           LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
133        }
134        
135        public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
136          throws IOException {
137          super( file, fs.getFileStatus(file).getReplication() );
138          this.datas = fs.getRawFileSystem().open(file, bufferSize);
139          this.fs = fs;
140          Path sumFile = fs.getChecksumFile(file);
141          try {
142            int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
143            sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
144    
145            byte[] version = new byte[CHECKSUM_VERSION.length];
146            sums.readFully(version);
147            if (!Arrays.equals(version, CHECKSUM_VERSION))
148              throw new IOException("Not a checksum file: "+sumFile);
149            this.bytesPerSum = sums.readInt();
150            set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
151          } catch (FileNotFoundException e) {         // quietly ignore
152            set(fs.verifyChecksum, null, 1, 0);
153          } catch (IOException e) {                   // loudly ignore
154            LOG.warn("Problem opening checksum file: "+ file + 
155                     ".  Ignoring exception: " , e); 
156            set(fs.verifyChecksum, null, 1, 0);
157          }
158        }
159        
160        private long getChecksumFilePos( long dataPos ) {
161          return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
162        }
163        
164        @Override
165        protected long getChunkPosition( long dataPos ) {
166          return dataPos/bytesPerSum*bytesPerSum;
167        }
168        
169        @Override
170        public int available() throws IOException {
171          return datas.available() + super.available();
172        }
173        
174        @Override
175        public int read(long position, byte[] b, int off, int len)
176          throws IOException {
177          // parameter check
178          if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
179            throw new IndexOutOfBoundsException();
180          } else if (len == 0) {
181            return 0;
182          }
183          if( position<0 ) {
184            throw new IllegalArgumentException(
185                "Parameter position can not to be negative");
186          }
187    
188          ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
189          checker.seek(position);
190          int nread = checker.read(b, off, len);
191          checker.close();
192          return nread;
193        }
194        
195        @Override
196        public void close() throws IOException {
197          datas.close();
198          if( sums != null ) {
199            sums.close();
200          }
201          set(fs.verifyChecksum, null, 1, 0);
202        }
203        
204    
205        @Override
206        public boolean seekToNewSource(long targetPos) throws IOException {
207          long sumsPos = getChecksumFilePos(targetPos);
208          fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
209          boolean newDataSource = datas.seekToNewSource(targetPos);
210          return sums.seekToNewSource(sumsPos) || newDataSource;
211        }
212    
213        @Override
214        protected int readChunk(long pos, byte[] buf, int offset, int len,
215            byte[] checksum) throws IOException {
216    
217          boolean eof = false;
218          if (needChecksum()) {
219            assert checksum != null; // we have a checksum buffer
220            assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
221            assert len >= bytesPerSum; // we must read at least one chunk
222    
223            final int checksumsToRead = Math.min(
224              len/bytesPerSum, // number of checksums based on len to read
225              checksum.length / CHECKSUM_SIZE); // size of checksum buffer
226            long checksumPos = getChecksumFilePos(pos); 
227            if(checksumPos != sums.getPos()) {
228              sums.seek(checksumPos);
229            }
230    
231            int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
232            if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
233              throw new ChecksumException(
234                "Checksum file not a length multiple of checksum size " +
235                "in " + file + " at " + pos + " checksumpos: " + checksumPos +
236                " sumLenread: " + sumLenRead,
237                pos);
238            }
239            if (sumLenRead <= 0) { // we're at the end of the file
240              eof = true;
241            } else {
242              // Adjust amount of data to read based on how many checksum chunks we read
243              len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
244            }
245          }
246          if(pos != datas.getPos()) {
247            datas.seek(pos);
248          }
249          int nread = readFully(datas, buf, offset, len);
250          if (eof && nread > 0) {
251            throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
252          }
253          return nread;
254        }
255      }
256      
257      private static class FSDataBoundedInputStream extends FSDataInputStream {
258        private FileSystem fs;
259        private Path file;
260        private long fileLen = -1L;
261    
262        FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
263            throws IOException {
264          super(in);
265          this.fs = fs;
266          this.file = file;
267        }
268        
269        @Override
270        public boolean markSupported() {
271          return false;
272        }
273        
274        /* Return the file length */
275        private long getFileLength() throws IOException {
276          if( fileLen==-1L ) {
277            fileLen = fs.getContentSummary(file).getLength();
278          }
279          return fileLen;
280        }
281        
282        /**
283         * Skips over and discards <code>n</code> bytes of data from the
284         * input stream.
285         *
286         *The <code>skip</code> method skips over some smaller number of bytes
287         * when reaching end of file before <code>n</code> bytes have been skipped.
288         * The actual number of bytes skipped is returned.  If <code>n</code> is
289         * negative, no bytes are skipped.
290         *
291         * @param      n   the number of bytes to be skipped.
292         * @return     the actual number of bytes skipped.
293         * @exception  IOException  if an I/O error occurs.
294         *             ChecksumException if the chunk to skip to is corrupted
295         */
296        @Override
297        public synchronized long skip(long n) throws IOException {
298          long curPos = getPos();
299          long fileLength = getFileLength();
300          if( n+curPos > fileLength ) {
301            n = fileLength - curPos;
302          }
303          return super.skip(n);
304        }
305        
306        /**
307         * Seek to the given position in the stream.
308         * The next read() will be from that position.
309         * 
310         * <p>This method does not allow seek past the end of the file.
311         * This produces IOException.
312         *
313         * @param      pos   the postion to seek to.
314         * @exception  IOException  if an I/O error occurs or seeks after EOF
315         *             ChecksumException if the chunk to seek to is corrupted
316         */
317    
318        @Override
319        public synchronized void seek(long pos) throws IOException {
320          if(pos>getFileLength()) {
321            throw new IOException("Cannot seek after EOF");
322          }
323          super.seek(pos);
324        }
325    
326      }
327    
328      /**
329       * Opens an FSDataInputStream at the indicated Path.
330       * @param f the file name to open
331       * @param bufferSize the size of the buffer to be used.
332       */
333      @Override
334      public FSDataInputStream open(Path f, int bufferSize) throws IOException {
335        FileSystem fs;
336        InputStream in;
337        if (verifyChecksum) {
338          fs = this;
339          in = new ChecksumFSInputChecker(this, f, bufferSize);
340        } else {
341          fs = getRawFileSystem();
342          in = fs.open(f, bufferSize);
343        }
344        return new FSDataBoundedInputStream(fs, f, in);
345      }
346    
347      @Override
348      public FSDataOutputStream append(Path f, int bufferSize,
349          Progressable progress) throws IOException {
350        throw new IOException("Not supported");
351      }
352    
353      /**
354       * Calculated the length of the checksum file in bytes.
355       * @param size the length of the data file in bytes
356       * @param bytesPerSum the number of bytes in a checksum block
357       * @return the number of bytes in the checksum file
358       */
359      public static long getChecksumLength(long size, int bytesPerSum) {
360        //the checksum length is equal to size passed divided by bytesPerSum +
361        //bytes written in the beginning of the checksum file.  
362        return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
363                 CHECKSUM_VERSION.length + 4;  
364      }
365    
366      /** This class provides an output stream for a checksummed file.
367       * It generates checksums for data. */
368      private static class ChecksumFSOutputSummer extends FSOutputSummer {
369        private FSDataOutputStream datas;    
370        private FSDataOutputStream sums;
371        private static final float CHKSUM_AS_FRACTION = 0.01f;
372        private boolean isClosed = false;
373        
374        public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
375                              Path file, 
376                              boolean overwrite,
377                              int bufferSize,
378                              short replication,
379                              long blockSize,
380                              Progressable progress)
381          throws IOException {
382          super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
383          int bytesPerSum = fs.getBytesPerSum();
384          this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
385                                             replication, blockSize, progress);
386          int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
387          this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
388                                                   sumBufferSize, replication,
389                                                   blockSize);
390          sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
391          sums.writeInt(bytesPerSum);
392        }
393        
394        @Override
395        public void close() throws IOException {
396          try {
397            flushBuffer();
398            sums.close();
399            datas.close();
400          } finally {
401            isClosed = true;
402          }
403        }
404        
405        @Override
406        protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
407        throws IOException {
408          datas.write(b, offset, len);
409          sums.write(checksum);
410        }
411    
412        @Override
413        protected void checkClosed() throws IOException {
414          if (isClosed) {
415            throw new ClosedChannelException();
416          }
417        }
418      }
419    
420      @Override
421      public FSDataOutputStream create(Path f, FsPermission permission,
422          boolean overwrite, int bufferSize, short replication, long blockSize,
423          Progressable progress) throws IOException {
424        return create(f, permission, overwrite, true, bufferSize,
425            replication, blockSize, progress);
426      }
427    
428      private FSDataOutputStream create(Path f, FsPermission permission,
429          boolean overwrite, boolean createParent, int bufferSize,
430          short replication, long blockSize,
431          Progressable progress) throws IOException {
432        Path parent = f.getParent();
433        if (parent != null) {
434          if (!createParent && !exists(parent)) {
435            throw new FileNotFoundException("Parent directory doesn't exist: "
436                + parent);
437          } else if (!mkdirs(parent)) {
438            throw new IOException("Mkdirs failed to create " + parent);
439          }
440        }
441        final FSDataOutputStream out;
442        if (writeChecksum) {
443          out = new FSDataOutputStream(
444              new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
445                  blockSize, progress), null);
446        } else {
447          out = fs.create(f, permission, overwrite, bufferSize, replication,
448              blockSize, progress);
449          // remove the checksum file since we aren't writing one
450          Path checkFile = getChecksumFile(f);
451          if (fs.exists(checkFile)) {
452            fs.delete(checkFile, true);
453          }
454        }
455        if (permission != null) {
456          setPermission(f, permission);
457        }
458        return out;
459      }
460    
461      @Override
462      public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
463          boolean overwrite, int bufferSize, short replication, long blockSize,
464          Progressable progress) throws IOException {
465        return create(f, permission, overwrite, false, bufferSize, replication,
466            blockSize, progress);
467      }
468    
469      /**
470       * Set replication for an existing file.
471       * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
472       * @param src file name
473       * @param replication new replication
474       * @throws IOException
475       * @return true if successful;
476       *         false if file does not exist or is a directory
477       */
478      @Override
479      public boolean setReplication(Path src, short replication) throws IOException {
480        boolean value = fs.setReplication(src, replication);
481        if (!value)
482          return false;
483    
484        Path checkFile = getChecksumFile(src);
485        if (exists(checkFile))
486          fs.setReplication(checkFile, replication);
487    
488        return true;
489      }
490    
491      /**
492       * Rename files/dirs
493       */
494      @Override
495      public boolean rename(Path src, Path dst) throws IOException {
496        if (fs.isDirectory(src)) {
497          return fs.rename(src, dst);
498        } else {
499          if (fs.isDirectory(dst)) {
500            dst = new Path(dst, src.getName());
501          }
502    
503          boolean value = fs.rename(src, dst);
504          if (!value)
505            return false;
506    
507          Path srcCheckFile = getChecksumFile(src);
508          Path dstCheckFile = getChecksumFile(dst);
509          if (fs.exists(srcCheckFile)) { //try to rename checksum
510            value = fs.rename(srcCheckFile, dstCheckFile);
511          } else if (fs.exists(dstCheckFile)) {
512            // no src checksum, so remove dst checksum
513            value = fs.delete(dstCheckFile, true); 
514          }
515    
516          return value;
517        }
518      }
519    
520      /**
521       * Implement the delete(Path, boolean) in checksum
522       * file system.
523       */
524      @Override
525      public boolean delete(Path f, boolean recursive) throws IOException{
526        FileStatus fstatus = null;
527        try {
528          fstatus = fs.getFileStatus(f);
529        } catch(FileNotFoundException e) {
530          return false;
531        }
532        if (fstatus.isDirectory()) {
533          //this works since the crcs are in the same
534          //directories and the files. so we just delete
535          //everything in the underlying filesystem
536          return fs.delete(f, recursive);
537        } else {
538          Path checkFile = getChecksumFile(f);
539          if (fs.exists(checkFile)) {
540            fs.delete(checkFile, true);
541          }
542          return fs.delete(f, true);
543        }
544      }
545        
546      final private static PathFilter DEFAULT_FILTER = new PathFilter() {
547        @Override
548        public boolean accept(Path file) {
549          return !isChecksumFile(file);
550        }
551      };
552    
553      /**
554       * List the statuses of the files/directories in the given path if the path is
555       * a directory.
556       * 
557       * @param f
558       *          given path
559       * @return the statuses of the files/directories in the given patch
560       * @throws IOException
561       */
562      @Override
563      public FileStatus[] listStatus(Path f) throws IOException {
564        return fs.listStatus(f, DEFAULT_FILTER);
565      }
566      
567      /**
568       * List the statuses of the files/directories in the given path if the path is
569       * a directory.
570       * 
571       * @param f
572       *          given path
573       * @return the statuses of the files/directories in the given patch
574       * @throws IOException
575       */
576      @Override
577      public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
578      throws IOException {
579        return fs.listLocatedStatus(f, DEFAULT_FILTER);
580      }
581      
582      @Override
583      public boolean mkdirs(Path f) throws IOException {
584        return fs.mkdirs(f);
585      }
586    
587      @Override
588      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
589        throws IOException {
590        Configuration conf = getConf();
591        FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
592      }
593    
594      /**
595       * The src file is under FS, and the dst is on the local disk.
596       * Copy it from FS control to the local dst name.
597       */
598      @Override
599      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
600        throws IOException {
601        Configuration conf = getConf();
602        FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
603      }
604    
605      /**
606       * The src file is under FS, and the dst is on the local disk.
607       * Copy it from FS control to the local dst name.
608       * If src and dst are directories, the copyCrc parameter
609       * determines whether to copy CRC files.
610       */
611      public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
612        throws IOException {
613        if (!fs.isDirectory(src)) { // source is a file
614          fs.copyToLocalFile(src, dst);
615          FileSystem localFs = getLocal(getConf()).getRawFileSystem();
616          if (localFs.isDirectory(dst)) {
617            dst = new Path(dst, src.getName());
618          }
619          dst = getChecksumFile(dst);
620          if (localFs.exists(dst)) { //remove old local checksum file
621            localFs.delete(dst, true);
622          }
623          Path checksumFile = getChecksumFile(src);
624          if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
625            fs.copyToLocalFile(checksumFile, dst);
626          }
627        } else {
628          FileStatus[] srcs = listStatus(src);
629          for (FileStatus srcFile : srcs) {
630            copyToLocalFile(srcFile.getPath(), 
631                            new Path(dst, srcFile.getPath().getName()), copyCrc);
632          }
633        }
634      }
635    
636      @Override
637      public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
638        throws IOException {
639        return tmpLocalFile;
640      }
641    
642      @Override
643      public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
644        throws IOException {
645        moveFromLocalFile(tmpLocalFile, fsOutputFile);
646      }
647    
648      /**
649       * Report a checksum error to the file system.
650       * @param f the file name containing the error
651       * @param in the stream open on the file
652       * @param inPos the position of the beginning of the bad data in the file
653       * @param sums the stream open on the checksum file
654       * @param sumsPos the position of the beginning of the bad data in the checksum file
655       * @return if retry is neccessary
656       */
657      public boolean reportChecksumFailure(Path f, FSDataInputStream in,
658                                           long inPos, FSDataInputStream sums, long sumsPos) {
659        return false;
660      }
661    }