001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.fs;
020
021 import java.io.EOFException;
022 import java.io.FileNotFoundException;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.nio.channels.ClosedChannelException;
026 import java.util.Arrays;
027
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.Configuration;
031 import org.apache.hadoop.fs.permission.FsPermission;
032 import org.apache.hadoop.util.DataChecksum;
033 import org.apache.hadoop.util.Progressable;
034
035 /****************************************************************
036 * Abstract Checksumed FileSystem.
037 * It provide a basic implementation of a Checksumed FileSystem,
038 * which creates a checksum file for each raw file.
039 * It generates & verifies checksums at the client side.
040 *
041 *****************************************************************/
042 @InterfaceAudience.Public
043 @InterfaceStability.Stable
044 public abstract class ChecksumFileSystem extends FilterFileSystem {
045 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
046 private int bytesPerChecksum = 512;
047 private boolean verifyChecksum = true;
048 private boolean writeChecksum = true;
049
050 public static double getApproxChkSumLength(long size) {
051 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
052 }
053
054 public ChecksumFileSystem(FileSystem fs) {
055 super(fs);
056 }
057
058 @Override
059 public void setConf(Configuration conf) {
060 super.setConf(conf);
061 if (conf != null) {
062 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
063 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
064 }
065 }
066
067 /**
068 * Set whether to verify checksum.
069 */
070 @Override
071 public void setVerifyChecksum(boolean verifyChecksum) {
072 this.verifyChecksum = verifyChecksum;
073 }
074
075 @Override
076 public void setWriteChecksum(boolean writeChecksum) {
077 this.writeChecksum = writeChecksum;
078 }
079
080 /** get the raw file system */
081 @Override
082 public FileSystem getRawFileSystem() {
083 return fs;
084 }
085
086 /** Return the name of the checksum file associated with a file.*/
087 public Path getChecksumFile(Path file) {
088 return new Path(file.getParent(), "." + file.getName() + ".crc");
089 }
090
091 /** Return true iff file is a checksum file name.*/
092 public static boolean isChecksumFile(Path file) {
093 String name = file.getName();
094 return name.startsWith(".") && name.endsWith(".crc");
095 }
096
097 /** Return the length of the checksum file given the size of the
098 * actual file.
099 **/
100 public long getChecksumFileLength(Path file, long fileSize) {
101 return getChecksumLength(fileSize, getBytesPerSum());
102 }
103
104 /** Return the bytes Per Checksum */
105 public int getBytesPerSum() {
106 return bytesPerChecksum;
107 }
108
109 private int getSumBufferSize(int bytesPerSum, int bufferSize) {
110 int defaultBufferSize = getConf().getInt(
111 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
112 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
113 int proportionalBufferSize = bufferSize / bytesPerSum;
114 return Math.max(bytesPerSum,
115 Math.max(proportionalBufferSize, defaultBufferSize));
116 }
117
118 /*******************************************************
119 * For open()'s FSInputStream
120 * It verifies that data matches checksums.
121 *******************************************************/
122 private static class ChecksumFSInputChecker extends FSInputChecker {
123 private ChecksumFileSystem fs;
124 private FSDataInputStream datas;
125 private FSDataInputStream sums;
126
127 private static final int HEADER_LENGTH = 8;
128
129 private int bytesPerSum = 1;
130
131 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
132 throws IOException {
133 this(fs, file, fs.getConf().getInt(
134 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
135 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
136 }
137
138 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
139 throws IOException {
140 super( file, fs.getFileStatus(file).getReplication() );
141 this.datas = fs.getRawFileSystem().open(file, bufferSize);
142 this.fs = fs;
143 Path sumFile = fs.getChecksumFile(file);
144 try {
145 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
146 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
147
148 byte[] version = new byte[CHECKSUM_VERSION.length];
149 sums.readFully(version);
150 if (!Arrays.equals(version, CHECKSUM_VERSION))
151 throw new IOException("Not a checksum file: "+sumFile);
152 this.bytesPerSum = sums.readInt();
153 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
154 } catch (FileNotFoundException e) { // quietly ignore
155 set(fs.verifyChecksum, null, 1, 0);
156 } catch (IOException e) { // loudly ignore
157 LOG.warn("Problem opening checksum file: "+ file +
158 ". Ignoring exception: " , e);
159 set(fs.verifyChecksum, null, 1, 0);
160 }
161 }
162
163 private long getChecksumFilePos( long dataPos ) {
164 return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
165 }
166
167 @Override
168 protected long getChunkPosition( long dataPos ) {
169 return dataPos/bytesPerSum*bytesPerSum;
170 }
171
172 @Override
173 public int available() throws IOException {
174 return datas.available() + super.available();
175 }
176
177 @Override
178 public int read(long position, byte[] b, int off, int len)
179 throws IOException {
180 // parameter check
181 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
182 throw new IndexOutOfBoundsException();
183 } else if (len == 0) {
184 return 0;
185 }
186 if( position<0 ) {
187 throw new IllegalArgumentException(
188 "Parameter position can not to be negative");
189 }
190
191 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
192 checker.seek(position);
193 int nread = checker.read(b, off, len);
194 checker.close();
195 return nread;
196 }
197
198 @Override
199 public void close() throws IOException {
200 datas.close();
201 if( sums != null ) {
202 sums.close();
203 }
204 set(fs.verifyChecksum, null, 1, 0);
205 }
206
207
208 @Override
209 public boolean seekToNewSource(long targetPos) throws IOException {
210 long sumsPos = getChecksumFilePos(targetPos);
211 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
212 boolean newDataSource = datas.seekToNewSource(targetPos);
213 return sums.seekToNewSource(sumsPos) || newDataSource;
214 }
215
216 @Override
217 protected int readChunk(long pos, byte[] buf, int offset, int len,
218 byte[] checksum) throws IOException {
219
220 boolean eof = false;
221 if (needChecksum()) {
222 assert checksum != null; // we have a checksum buffer
223 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
224 assert len >= bytesPerSum; // we must read at least one chunk
225
226 final int checksumsToRead = Math.min(
227 len/bytesPerSum, // number of checksums based on len to read
228 checksum.length / CHECKSUM_SIZE); // size of checksum buffer
229 long checksumPos = getChecksumFilePos(pos);
230 if(checksumPos != sums.getPos()) {
231 sums.seek(checksumPos);
232 }
233
234 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
235 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
236 throw new ChecksumException(
237 "Checksum file not a length multiple of checksum size " +
238 "in " + file + " at " + pos + " checksumpos: " + checksumPos +
239 " sumLenread: " + sumLenRead,
240 pos);
241 }
242 if (sumLenRead <= 0) { // we're at the end of the file
243 eof = true;
244 } else {
245 // Adjust amount of data to read based on how many checksum chunks we read
246 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
247 }
248 }
249 if(pos != datas.getPos()) {
250 datas.seek(pos);
251 }
252 int nread = readFully(datas, buf, offset, len);
253 if (eof && nread > 0) {
254 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
255 }
256 return nread;
257 }
258 }
259
260 private static class FSDataBoundedInputStream extends FSDataInputStream {
261 private FileSystem fs;
262 private Path file;
263 private long fileLen = -1L;
264
265 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
266 super(in);
267 this.fs = fs;
268 this.file = file;
269 }
270
271 @Override
272 public boolean markSupported() {
273 return false;
274 }
275
276 /* Return the file length */
277 private long getFileLength() throws IOException {
278 if( fileLen==-1L ) {
279 fileLen = fs.getContentSummary(file).getLength();
280 }
281 return fileLen;
282 }
283
284 /**
285 * Skips over and discards <code>n</code> bytes of data from the
286 * input stream.
287 *
288 *The <code>skip</code> method skips over some smaller number of bytes
289 * when reaching end of file before <code>n</code> bytes have been skipped.
290 * The actual number of bytes skipped is returned. If <code>n</code> is
291 * negative, no bytes are skipped.
292 *
293 * @param n the number of bytes to be skipped.
294 * @return the actual number of bytes skipped.
295 * @exception IOException if an I/O error occurs.
296 * ChecksumException if the chunk to skip to is corrupted
297 */
298 @Override
299 public synchronized long skip(long n) throws IOException {
300 long curPos = getPos();
301 long fileLength = getFileLength();
302 if( n+curPos > fileLength ) {
303 n = fileLength - curPos;
304 }
305 return super.skip(n);
306 }
307
308 /**
309 * Seek to the given position in the stream.
310 * The next read() will be from that position.
311 *
312 * <p>This method does not allow seek past the end of the file.
313 * This produces IOException.
314 *
315 * @param pos the postion to seek to.
316 * @exception IOException if an I/O error occurs or seeks after EOF
317 * ChecksumException if the chunk to seek to is corrupted
318 */
319
320 @Override
321 public synchronized void seek(long pos) throws IOException {
322 if (pos > getFileLength()) {
323 throw new EOFException("Cannot seek after EOF");
324 }
325 super.seek(pos);
326 }
327
328 }
329
330 /**
331 * Opens an FSDataInputStream at the indicated Path.
332 * @param f the file name to open
333 * @param bufferSize the size of the buffer to be used.
334 */
335 @Override
336 public FSDataInputStream open(Path f, int bufferSize) throws IOException {
337 FileSystem fs;
338 InputStream in;
339 if (verifyChecksum) {
340 fs = this;
341 in = new ChecksumFSInputChecker(this, f, bufferSize);
342 } else {
343 fs = getRawFileSystem();
344 in = fs.open(f, bufferSize);
345 }
346 return new FSDataBoundedInputStream(fs, f, in);
347 }
348
349 @Override
350 public FSDataOutputStream append(Path f, int bufferSize,
351 Progressable progress) throws IOException {
352 throw new IOException("Not supported");
353 }
354
355 /**
356 * Calculated the length of the checksum file in bytes.
357 * @param size the length of the data file in bytes
358 * @param bytesPerSum the number of bytes in a checksum block
359 * @return the number of bytes in the checksum file
360 */
361 public static long getChecksumLength(long size, int bytesPerSum) {
362 //the checksum length is equal to size passed divided by bytesPerSum +
363 //bytes written in the beginning of the checksum file.
364 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
365 CHECKSUM_VERSION.length + 4;
366 }
367
368 /** This class provides an output stream for a checksummed file.
369 * It generates checksums for data. */
370 private static class ChecksumFSOutputSummer extends FSOutputSummer {
371 private FSDataOutputStream datas;
372 private FSDataOutputStream sums;
373 private static final float CHKSUM_AS_FRACTION = 0.01f;
374 private boolean isClosed = false;
375
376 public ChecksumFSOutputSummer(ChecksumFileSystem fs,
377 Path file,
378 boolean overwrite,
379 int bufferSize,
380 short replication,
381 long blockSize,
382 Progressable progress)
383 throws IOException {
384 super(DataChecksum.newCrc32(), fs.getBytesPerSum(), 4);
385 int bytesPerSum = fs.getBytesPerSum();
386 this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize,
387 replication, blockSize, progress);
388 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
389 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true,
390 sumBufferSize, replication,
391 blockSize);
392 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
393 sums.writeInt(bytesPerSum);
394 }
395
396 @Override
397 public void close() throws IOException {
398 try {
399 flushBuffer();
400 sums.close();
401 datas.close();
402 } finally {
403 isClosed = true;
404 }
405 }
406
407 @Override
408 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
409 throws IOException {
410 datas.write(b, offset, len);
411 sums.write(checksum);
412 }
413
414 @Override
415 protected void checkClosed() throws IOException {
416 if (isClosed) {
417 throw new ClosedChannelException();
418 }
419 }
420 }
421
422 @Override
423 public FSDataOutputStream create(Path f, FsPermission permission,
424 boolean overwrite, int bufferSize, short replication, long blockSize,
425 Progressable progress) throws IOException {
426 return create(f, permission, overwrite, true, bufferSize,
427 replication, blockSize, progress);
428 }
429
430 private FSDataOutputStream create(Path f, FsPermission permission,
431 boolean overwrite, boolean createParent, int bufferSize,
432 short replication, long blockSize,
433 Progressable progress) throws IOException {
434 Path parent = f.getParent();
435 if (parent != null) {
436 if (!createParent && !exists(parent)) {
437 throw new FileNotFoundException("Parent directory doesn't exist: "
438 + parent);
439 } else if (!mkdirs(parent)) {
440 throw new IOException("Mkdirs failed to create " + parent
441 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
442 + ")");
443 }
444 }
445 final FSDataOutputStream out;
446 if (writeChecksum) {
447 out = new FSDataOutputStream(
448 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
449 blockSize, progress), null);
450 } else {
451 out = fs.create(f, permission, overwrite, bufferSize, replication,
452 blockSize, progress);
453 // remove the checksum file since we aren't writing one
454 Path checkFile = getChecksumFile(f);
455 if (fs.exists(checkFile)) {
456 fs.delete(checkFile, true);
457 }
458 }
459 if (permission != null) {
460 setPermission(f, permission);
461 }
462 return out;
463 }
464
465 @Override
466 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
467 boolean overwrite, int bufferSize, short replication, long blockSize,
468 Progressable progress) throws IOException {
469 return create(f, permission, overwrite, false, bufferSize, replication,
470 blockSize, progress);
471 }
472
473 /**
474 * Set replication for an existing file.
475 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
476 * @param src file name
477 * @param replication new replication
478 * @throws IOException
479 * @return true if successful;
480 * false if file does not exist or is a directory
481 */
482 @Override
483 public boolean setReplication(Path src, short replication) throws IOException {
484 boolean value = fs.setReplication(src, replication);
485 if (!value)
486 return false;
487
488 Path checkFile = getChecksumFile(src);
489 if (exists(checkFile))
490 fs.setReplication(checkFile, replication);
491
492 return true;
493 }
494
495 /**
496 * Rename files/dirs
497 */
498 @Override
499 public boolean rename(Path src, Path dst) throws IOException {
500 if (fs.isDirectory(src)) {
501 return fs.rename(src, dst);
502 } else {
503 if (fs.isDirectory(dst)) {
504 dst = new Path(dst, src.getName());
505 }
506
507 boolean value = fs.rename(src, dst);
508 if (!value)
509 return false;
510
511 Path srcCheckFile = getChecksumFile(src);
512 Path dstCheckFile = getChecksumFile(dst);
513 if (fs.exists(srcCheckFile)) { //try to rename checksum
514 value = fs.rename(srcCheckFile, dstCheckFile);
515 } else if (fs.exists(dstCheckFile)) {
516 // no src checksum, so remove dst checksum
517 value = fs.delete(dstCheckFile, true);
518 }
519
520 return value;
521 }
522 }
523
524 /**
525 * Implement the delete(Path, boolean) in checksum
526 * file system.
527 */
528 @Override
529 public boolean delete(Path f, boolean recursive) throws IOException{
530 FileStatus fstatus = null;
531 try {
532 fstatus = fs.getFileStatus(f);
533 } catch(FileNotFoundException e) {
534 return false;
535 }
536 if (fstatus.isDirectory()) {
537 //this works since the crcs are in the same
538 //directories and the files. so we just delete
539 //everything in the underlying filesystem
540 return fs.delete(f, recursive);
541 } else {
542 Path checkFile = getChecksumFile(f);
543 if (fs.exists(checkFile)) {
544 fs.delete(checkFile, true);
545 }
546 return fs.delete(f, true);
547 }
548 }
549
550 final private static PathFilter DEFAULT_FILTER = new PathFilter() {
551 @Override
552 public boolean accept(Path file) {
553 return !isChecksumFile(file);
554 }
555 };
556
557 /**
558 * List the statuses of the files/directories in the given path if the path is
559 * a directory.
560 *
561 * @param f
562 * given path
563 * @return the statuses of the files/directories in the given patch
564 * @throws IOException
565 */
566 @Override
567 public FileStatus[] listStatus(Path f) throws IOException {
568 return fs.listStatus(f, DEFAULT_FILTER);
569 }
570
571 /**
572 * List the statuses of the files/directories in the given path if the path is
573 * a directory.
574 *
575 * @param f
576 * given path
577 * @return the statuses of the files/directories in the given patch
578 * @throws IOException
579 */
580 @Override
581 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
582 throws IOException {
583 return fs.listLocatedStatus(f, DEFAULT_FILTER);
584 }
585
586 @Override
587 public boolean mkdirs(Path f) throws IOException {
588 return fs.mkdirs(f);
589 }
590
591 @Override
592 public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
593 throws IOException {
594 Configuration conf = getConf();
595 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
596 }
597
598 /**
599 * The src file is under FS, and the dst is on the local disk.
600 * Copy it from FS control to the local dst name.
601 */
602 @Override
603 public void copyToLocalFile(boolean delSrc, Path src, Path dst)
604 throws IOException {
605 Configuration conf = getConf();
606 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
607 }
608
609 /**
610 * The src file is under FS, and the dst is on the local disk.
611 * Copy it from FS control to the local dst name.
612 * If src and dst are directories, the copyCrc parameter
613 * determines whether to copy CRC files.
614 */
615 public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
616 throws IOException {
617 if (!fs.isDirectory(src)) { // source is a file
618 fs.copyToLocalFile(src, dst);
619 FileSystem localFs = getLocal(getConf()).getRawFileSystem();
620 if (localFs.isDirectory(dst)) {
621 dst = new Path(dst, src.getName());
622 }
623 dst = getChecksumFile(dst);
624 if (localFs.exists(dst)) { //remove old local checksum file
625 localFs.delete(dst, true);
626 }
627 Path checksumFile = getChecksumFile(src);
628 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
629 fs.copyToLocalFile(checksumFile, dst);
630 }
631 } else {
632 FileStatus[] srcs = listStatus(src);
633 for (FileStatus srcFile : srcs) {
634 copyToLocalFile(srcFile.getPath(),
635 new Path(dst, srcFile.getPath().getName()), copyCrc);
636 }
637 }
638 }
639
640 @Override
641 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
642 throws IOException {
643 return tmpLocalFile;
644 }
645
646 @Override
647 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
648 throws IOException {
649 moveFromLocalFile(tmpLocalFile, fsOutputFile);
650 }
651
652 /**
653 * Report a checksum error to the file system.
654 * @param f the file name containing the error
655 * @param in the stream open on the file
656 * @param inPos the position of the beginning of the bad data in the file
657 * @param sums the stream open on the checksum file
658 * @param sumsPos the position of the beginning of the bad data in the checksum file
659 * @return if retry is neccessary
660 */
661 public boolean reportChecksumFailure(Path f, FSDataInputStream in,
662 long inPos, FSDataInputStream sums, long sumsPos) {
663 return false;
664 }
665 }