001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs;
020
021 import java.io.*;
022 import java.nio.channels.ClosedChannelException;
023 import java.util.Arrays;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.fs.permission.FsPermission;
029 import org.apache.hadoop.util.Progressable;
030 import org.apache.hadoop.util.PureJavaCrc32;
031
032 /****************************************************************
033 * Abstract Checksumed FileSystem.
034 * It provide a basic implementation of a Checksumed FileSystem,
035 * which creates a checksum file for each raw file.
036 * It generates & verifies checksums at the client side.
037 *
038 *****************************************************************/
039 @InterfaceAudience.Public
040 @InterfaceStability.Stable
041 public abstract class ChecksumFileSystem extends FilterFileSystem {
042 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
043 private int bytesPerChecksum = 512;
044 private boolean verifyChecksum = true;
045 private boolean writeChecksum = true;
046
047 public static double getApproxChkSumLength(long size) {
048 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
049 }
050
051 public ChecksumFileSystem(FileSystem fs) {
052 super(fs);
053 }
054
055 @Override
056 public void setConf(Configuration conf) {
057 super.setConf(conf);
058 if (conf != null) {
059 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
060 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
061 }
062 }
063
064 /**
065 * Set whether to verify checksum.
066 */
067 @Override
068 public void setVerifyChecksum(boolean verifyChecksum) {
069 this.verifyChecksum = verifyChecksum;
070 }
071
072 @Override
073 public void setWriteChecksum(boolean writeChecksum) {
074 this.writeChecksum = writeChecksum;
075 }
076
077 /** get the raw file system */
078 @Override
079 public FileSystem getRawFileSystem() {
080 return fs;
081 }
082
083 /** Return the name of the checksum file associated with a file.*/
084 public Path getChecksumFile(Path file) {
085 return new Path(file.getParent(), "." + file.getName() + ".crc");
086 }
087
088 /** Return true iff file is a checksum file name.*/
089 public static boolean isChecksumFile(Path file) {
090 String name = file.getName();
091 return name.startsWith(".") && name.endsWith(".crc");
092 }
093
094 /** Return the length of the checksum file given the size of the
095 * actual file.
096 **/
097 public long getChecksumFileLength(Path file, long fileSize) {
098 return getChecksumLength(fileSize, getBytesPerSum());
099 }
100
101 /** Return the bytes Per Checksum */
102 public int getBytesPerSum() {
103 return bytesPerChecksum;
104 }
105
106 private int getSumBufferSize(int bytesPerSum, int bufferSize) {
107 int defaultBufferSize = getConf().getInt(
108 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
109 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
110 int proportionalBufferSize = bufferSize / bytesPerSum;
111 return Math.max(bytesPerSum,
112 Math.max(proportionalBufferSize, defaultBufferSize));
113 }
114
115 /*******************************************************
116 * For open()'s FSInputStream
117 * It verifies that data matches checksums.
118 *******************************************************/
119 private static class ChecksumFSInputChecker extends FSInputChecker {
120 private ChecksumFileSystem fs;
121 private FSDataInputStream datas;
122 private FSDataInputStream sums;
123
124 private static final int HEADER_LENGTH = 8;
125
126 private int bytesPerSum = 1;
127
128 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
129 throws IOException {
130 this(fs, file, fs.getConf().getInt(
131 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
132 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
133 }
134
135 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
136 throws IOException {
137 super( file, fs.getFileStatus(file).getReplication() );
138 this.datas = fs.getRawFileSystem().open(file, bufferSize);
139 this.fs = fs;
140 Path sumFile = fs.getChecksumFile(file);
141 try {
142 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
143 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
144
145 byte[] version = new byte[CHECKSUM_VERSION.length];
146 sums.readFully(version);
147 if (!Arrays.equals(version, CHECKSUM_VERSION))
148 throw new IOException("Not a checksum file: "+sumFile);
149 this.bytesPerSum = sums.readInt();
150 set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
151 } catch (FileNotFoundException e) { // quietly ignore
152 set(fs.verifyChecksum, null, 1, 0);
153 } catch (IOException e) { // loudly ignore
154 LOG.warn("Problem opening checksum file: "+ file +
155 ". Ignoring exception: " , e);
156 set(fs.verifyChecksum, null, 1, 0);
157 }
158 }
159
160 private long getChecksumFilePos( long dataPos ) {
161 return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
162 }
163
164 @Override
165 protected long getChunkPosition( long dataPos ) {
166 return dataPos/bytesPerSum*bytesPerSum;
167 }
168
169 @Override
170 public int available() throws IOException {
171 return datas.available() + super.available();
172 }
173
174 @Override
175 public int read(long position, byte[] b, int off, int len)
176 throws IOException {
177 // parameter check
178 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
179 throw new IndexOutOfBoundsException();
180 } else if (len == 0) {
181 return 0;
182 }
183 if( position<0 ) {
184 throw new IllegalArgumentException(
185 "Parameter position can not to be negative");
186 }
187
188 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
189 checker.seek(position);
190 int nread = checker.read(b, off, len);
191 checker.close();
192 return nread;
193 }
194
195 @Override
196 public void close() throws IOException {
197 datas.close();
198 if( sums != null ) {
199 sums.close();
200 }
201 set(fs.verifyChecksum, null, 1, 0);
202 }
203
204
205 @Override
206 public boolean seekToNewSource(long targetPos) throws IOException {
207 long sumsPos = getChecksumFilePos(targetPos);
208 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
209 boolean newDataSource = datas.seekToNewSource(targetPos);
210 return sums.seekToNewSource(sumsPos) || newDataSource;
211 }
212
213 @Override
214 protected int readChunk(long pos, byte[] buf, int offset, int len,
215 byte[] checksum) throws IOException {
216
217 boolean eof = false;
218 if (needChecksum()) {
219 assert checksum != null; // we have a checksum buffer
220 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
221 assert len >= bytesPerSum; // we must read at least one chunk
222
223 final int checksumsToRead = Math.min(
224 len/bytesPerSum, // number of checksums based on len to read
225 checksum.length / CHECKSUM_SIZE); // size of checksum buffer
226 long checksumPos = getChecksumFilePos(pos);
227 if(checksumPos != sums.getPos()) {
228 sums.seek(checksumPos);
229 }
230
231 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
232 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
233 throw new ChecksumException(
234 "Checksum file not a length multiple of checksum size " +
235 "in " + file + " at " + pos + " checksumpos: " + checksumPos +
236 " sumLenread: " + sumLenRead,
237 pos);
238 }
239 if (sumLenRead <= 0) { // we're at the end of the file
240 eof = true;
241 } else {
242 // Adjust amount of data to read based on how many checksum chunks we read
243 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
244 }
245 }
246 if(pos != datas.getPos()) {
247 datas.seek(pos);
248 }
249 int nread = readFully(datas, buf, offset, len);
250 if (eof && nread > 0) {
251 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
252 }
253 return nread;
254 }
255 }
256
257 private static class FSDataBoundedInputStream extends FSDataInputStream {
258 private FileSystem fs;
259 private Path file;
260 private long fileLen = -1L;
261
262 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in)
263 throws IOException {
264 super(in);
265 this.fs = fs;
266 this.file = file;
267 }
268
269 @Override
270 public boolean markSupported() {
271 return false;
272 }
273
274 /* Return the file length */
275 private long getFileLength() throws IOException {
276 if( fileLen==-1L ) {
277 fileLen = fs.getContentSummary(file).getLength();
278 }
279 return fileLen;
280 }
281
282 /**
283 * Skips over and discards <code>n</code> bytes of data from the
284 * input stream.
285 *
286 *The <code>skip</code> method skips over some smaller number of bytes
287 * when reaching end of file before <code>n</code> bytes have been skipped.
288 * The actual number of bytes skipped is returned. If <code>n</code> is
289 * negative, no bytes are skipped.
290 *
291 * @param n the number of bytes to be skipped.
292 * @return the actual number of bytes skipped.
293 * @exception IOException if an I/O error occurs.
294 * ChecksumException if the chunk to skip to is corrupted
295 */
296 @Override
297 public synchronized long skip(long n) throws IOException {
298 long curPos = getPos();
299 long fileLength = getFileLength();
300 if( n+curPos > fileLength ) {
301 n = fileLength - curPos;
302 }
303 return super.skip(n);
304 }
305
306 /**
307 * Seek to the given position in the stream.
308 * The next read() will be from that position.
309 *
310 * <p>This method does not allow seek past the end of the file.
311 * This produces IOException.
312 *
313 * @param pos the postion to seek to.
314 * @exception IOException if an I/O error occurs or seeks after EOF
315 * ChecksumException if the chunk to seek to is corrupted
316 */
317
318 @Override
319 public synchronized void seek(long pos) throws IOException {
320 if(pos>getFileLength()) {
321 throw new IOException("Cannot seek after EOF");
322 }
323 super.seek(pos);
324 }
325
326 }
327
328 /**
329 * Opens an FSDataInputStream at the indicated Path.
330 * @param f the file name to open
331 * @param bufferSize the size of the buffer to be used.
332 */
333 @Override
334 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
335 FileSystem fs;
336 InputStream in;
337 if (verifyChecksum) {
338 fs = this;
339 in = new ChecksumFSInputChecker(this, f, bufferSize);
340 } else {
341 fs = getRawFileSystem();
342 in = fs.open(f, bufferSize);
343 }
344 return new FSDataBoundedInputStream(fs, f, in);
345 }
346
347 @Override
348 public FSDataOutputStream append(Path f, int bufferSize,
349 Progressable progress) throws IOException {
350 throw new IOException("Not supported");
351 }
352
353 /**
354 * Calculated the length of the checksum file in bytes.
355 * @param size the length of the data file in bytes
356 * @param bytesPerSum the number of bytes in a checksum block
357 * @return the number of bytes in the checksum file
358 */
359 public static long getChecksumLength(long size, int bytesPerSum) {
360 //the checksum length is equal to size passed divided by bytesPerSum +
361 //bytes written in the beginning of the checksum file.
362 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
363 CHECKSUM_VERSION.length + 4;
364 }
365
366 /** This class provides an output stream for a checksummed file.
367 * It generates checksums for data. */
368 private static class ChecksumFSOutputSummer extends FSOutputSummer {
369 private FSDataOutputStream datas;
370 private FSDataOutputStream sums;
371 private static final float CHKSUM_AS_FRACTION = 0.01f;
372 private boolean isClosed = false;
373
374 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
375 Path file,
376 boolean overwrite,
377 int bufferSize,
378 short replication,
379 long blockSize,
380 Progressable progress)
381 throws IOException {
382 super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
383 int bytesPerSum = fs.getBytesPerSum();
384 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
385 replication, blockSize, progress);
386 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
387 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true,
388 sumBufferSize, replication,
389 blockSize);
390 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
391 sums.writeInt(bytesPerSum);
392 }
393
394 @Override
395 public void close() throws IOException {
396 try {
397 flushBuffer();
398 sums.close();
399 datas.close();
400 } finally {
401 isClosed = true;
402 }
403 }
404
405 @Override
406 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
407 throws IOException {
408 datas.write(b, offset, len);
409 sums.write(checksum);
410 }
411
412 @Override
413 protected void checkClosed() throws IOException {
414 if (isClosed) {
415 throw new ClosedChannelException();
416 }
417 }
418 }
419
420 @Override
421 public FSDataOutputStream create(Path f, FsPermission permission,
422 boolean overwrite, int bufferSize, short replication, long blockSize,
423 Progressable progress) throws IOException {
424 return create(f, permission, overwrite, true, bufferSize,
425 replication, blockSize, progress);
426 }
427
428 private FSDataOutputStream create(Path f, FsPermission permission,
429 boolean overwrite, boolean createParent, int bufferSize,
430 short replication, long blockSize,
431 Progressable progress) throws IOException {
432 Path parent = f.getParent();
433 if (parent != null) {
434 if (!createParent && !exists(parent)) {
435 throw new FileNotFoundException("Parent directory doesn't exist: "
436 + parent);
437 } else if (!mkdirs(parent)) {
438 throw new IOException("Mkdirs failed to create " + parent);
439 }
440 }
441 final FSDataOutputStream out;
442 if (writeChecksum) {
443 out = new FSDataOutputStream(
444 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
445 blockSize, progress), null);
446 } else {
447 out = fs.create(f, permission, overwrite, bufferSize, replication,
448 blockSize, progress);
449 // remove the checksum file since we aren't writing one
450 Path checkFile = getChecksumFile(f);
451 if (fs.exists(checkFile)) {
452 fs.delete(checkFile, true);
453 }
454 }
455 if (permission != null) {
456 setPermission(f, permission);
457 }
458 return out;
459 }
460
461 @Override
462 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
463 boolean overwrite, int bufferSize, short replication, long blockSize,
464 Progressable progress) throws IOException {
465 return create(f, permission, overwrite, false, bufferSize, replication,
466 blockSize, progress);
467 }
468
469 /**
470 * Set replication for an existing file.
471 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
472 * @param src file name
473 * @param replication new replication
474 * @throws IOException
475 * @return true if successful;
476 * false if file does not exist or is a directory
477 */
478 @Override
479 public boolean setReplication(Path src, short replication) throws IOException {
480 boolean value = fs.setReplication(src, replication);
481 if (!value)
482 return false;
483
484 Path checkFile = getChecksumFile(src);
485 if (exists(checkFile))
486 fs.setReplication(checkFile, replication);
487
488 return true;
489 }
490
491 /**
492 * Rename files/dirs
493 */
494 @Override
495 public boolean rename(Path src, Path dst) throws IOException {
496 if (fs.isDirectory(src)) {
497 return fs.rename(src, dst);
498 } else {
499 if (fs.isDirectory(dst)) {
500 dst = new Path(dst, src.getName());
501 }
502
503 boolean value = fs.rename(src, dst);
504 if (!value)
505 return false;
506
507 Path srcCheckFile = getChecksumFile(src);
508 Path dstCheckFile = getChecksumFile(dst);
509 if (fs.exists(srcCheckFile)) { //try to rename checksum
510 value = fs.rename(srcCheckFile, dstCheckFile);
511 } else if (fs.exists(dstCheckFile)) {
512 // no src checksum, so remove dst checksum
513 value = fs.delete(dstCheckFile, true);
514 }
515
516 return value;
517 }
518 }
519
520 /**
521 * Implement the delete(Path, boolean) in checksum
522 * file system.
523 */
524 @Override
525 public boolean delete(Path f, boolean recursive) throws IOException{
526 FileStatus fstatus = null;
527 try {
528 fstatus = fs.getFileStatus(f);
529 } catch(FileNotFoundException e) {
530 return false;
531 }
532 if (fstatus.isDirectory()) {
533 //this works since the crcs are in the same
534 //directories and the files. so we just delete
535 //everything in the underlying filesystem
536 return fs.delete(f, recursive);
537 } else {
538 Path checkFile = getChecksumFile(f);
539 if (fs.exists(checkFile)) {
540 fs.delete(checkFile, true);
541 }
542 return fs.delete(f, true);
543 }
544 }
545
546 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
547 @Override
548 public boolean accept(Path file) {
549 return !isChecksumFile(file);
550 }
551 };
552
553 /**
554 * List the statuses of the files/directories in the given path if the path is
555 * a directory.
556 *
557 * @param f
558 * given path
559 * @return the statuses of the files/directories in the given patch
560 * @throws IOException
561 */
562 @Override
563 public FileStatus[] listStatus(Path f) throws IOException {
564 return fs.listStatus(f, DEFAULT_FILTER);
565 }
566
567 /**
568 * List the statuses of the files/directories in the given path if the path is
569 * a directory.
570 *
571 * @param f
572 * given path
573 * @return the statuses of the files/directories in the given patch
574 * @throws IOException
575 */
576 @Override
577 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
578 throws IOException {
579 return fs.listLocatedStatus(f, DEFAULT_FILTER);
580 }
581
582 @Override
583 public boolean mkdirs(Path f) throws IOException {
584 return fs.mkdirs(f);
585 }
586
587 @Override
588 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
589 throws IOException {
590 Configuration conf = getConf();
591 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
592 }
593
594 /**
595 * The src file is under FS, and the dst is on the local disk.
596 * Copy it from FS control to the local dst name.
597 */
598 @Override
599 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
600 throws IOException {
601 Configuration conf = getConf();
602 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
603 }
604
605 /**
606 * The src file is under FS, and the dst is on the local disk.
607 * Copy it from FS control to the local dst name.
608 * If src and dst are directories, the copyCrc parameter
609 * determines whether to copy CRC files.
610 */
611 public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
612 throws IOException {
613 if (!fs.isDirectory(src)) { // source is a file
614 fs.copyToLocalFile(src, dst);
615 FileSystem localFs = getLocal(getConf()).getRawFileSystem();
616 if (localFs.isDirectory(dst)) {
617 dst = new Path(dst, src.getName());
618 }
619 dst = getChecksumFile(dst);
620 if (localFs.exists(dst)) { //remove old local checksum file
621 localFs.delete(dst, true);
622 }
623 Path checksumFile = getChecksumFile(src);
624 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
625 fs.copyToLocalFile(checksumFile, dst);
626 }
627 } else {
628 FileStatus[] srcs = listStatus(src);
629 for (FileStatus srcFile : srcs) {
630 copyToLocalFile(srcFile.getPath(),
631 new Path(dst, srcFile.getPath().getName()), copyCrc);
632 }
633 }
634 }
635
636 @Override
637 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
638 throws IOException {
639 return tmpLocalFile;
640 }
641
642 @Override
643 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
644 throws IOException {
645 moveFromLocalFile(tmpLocalFile, fsOutputFile);
646 }
647
648 /**
649 * Report a checksum error to the file system.
650 * @param f the file name containing the error
651 * @param in the stream open on the file
652 * @param inPos the position of the beginning of the bad data in the file
653 * @param sums the stream open on the checksum file
654 * @param sumsPos the position of the beginning of the bad data in the checksum file
655 * @return if retry is neccessary
656 */
657 public boolean reportChecksumFailure(Path f, FSDataInputStream in,
658 long inPos, FSDataInputStream sums, long sumsPos) {
659 return false;
660 }
661 }