001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017
018 package org.apache.hadoop.io.file.tfile;
019
020 import java.io.ByteArrayInputStream;
021 import java.io.Closeable;
022 import java.io.DataInput;
023 import java.io.DataInputStream;
024 import java.io.DataOutput;
025 import java.io.DataOutputStream;
026 import java.io.EOFException;
027 import java.io.IOException;
028 import java.io.OutputStream;
029 import java.util.ArrayList;
030 import java.util.Comparator;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034 import org.apache.hadoop.classification.InterfaceAudience;
035 import org.apache.hadoop.classification.InterfaceStability;
036 import org.apache.hadoop.conf.Configuration;
037 import org.apache.hadoop.fs.FSDataInputStream;
038 import org.apache.hadoop.fs.FSDataOutputStream;
039 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
040 import org.apache.hadoop.io.BytesWritable;
041 import org.apache.hadoop.io.DataInputBuffer;
042 import org.apache.hadoop.io.DataOutputBuffer;
043 import org.apache.hadoop.io.IOUtils;
044 import org.apache.hadoop.io.RawComparator;
045 import org.apache.hadoop.io.WritableComparator;
046 import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
047 import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
048 import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
049 import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
050 import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
051 import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
052 import org.apache.hadoop.io.file.tfile.Utils.Version;
053 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
054
055 /**
056 * A TFile is a container of key-value pairs. Both keys and values are type-less
057 * bytes. Keys are restricted to 64KB, value length is not restricted
058 * (practically limited to the available disk storage). TFile further provides
059 * the following features:
060 * <ul>
061 * <li>Block Compression.
062 * <li>Named meta data blocks.
063 * <li>Sorted or unsorted keys.
064 * <li>Seek by key or by file offset.
065 * </ul>
066 * The memory footprint of a TFile includes the following:
067 * <ul>
068 * <li>Some constant overhead of reading or writing a compressed block.
069 * <ul>
070 * <li>Each compressed block requires one compression/decompression codec for
071 * I/O.
072 * <li>Temporary space to buffer the key.
073 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
074 * chunk encoded, so that we buffer at most one chunk of user data. By default,
075 * the chunk buffer is 1MB. Reading chunked value does not require additional
076 * memory.
077 * </ul>
078 * <li>TFile index, which is proportional to the total number of Data Blocks.
079 * The total amount of memory needed to hold the index can be estimated as
080 * (56+AvgKeySize)*NumBlocks.
081 * <li>MetaBlock index, which is proportional to the total number of Meta
082 * Blocks.The total amount of memory needed to hold the index for Meta Blocks
083 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
084 * </ul>
085 * <p>
086 * The behavior of TFile can be customized by the following variables through
087 * Configuration:
088 * <ul>
089 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
090 * to 1MB. Values of the length less than the chunk size is guaranteed to have
091 * known value length in read time (See
092 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
093 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
094 * FSDataOutputStream. Integer (in bytes). Default to 256KB.
095 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
096 * FSDataInputStream. Integer (in bytes). Default to 256KB.
097 * </ul>
098 * <p>
099 * Suggestions on performance optimization.
100 * <ul>
101 * <li>Minimum block size. We recommend a setting of minimum block size between
102 * 256KB to 1MB for general usage. Larger block size is preferred if files are
103 * primarily for sequential access. However, it would lead to inefficient random
104 * access (because there are more data to decompress). Smaller blocks are good
105 * for random access, but require more memory to hold the block index, and may
106 * be slower to create (because we must flush the compressor stream at the
107 * conclusion of each data block, which leads to an FS I/O flush). Further, due
108 * to the internal caching in Compression codec, the smallest possible block
109 * size would be around 20KB-30KB.
110 * <li>The current implementation does not offer true multi-threading for
111 * reading. The implementation uses FSDataInputStream seek()+read(), which is
112 * shown to be much faster than positioned-read call in single thread mode.
113 * However, it also means that if multiple threads attempt to access the same
114 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
115 * sequentially even if they access different DFS blocks.
116 * <li>Compression codec. Use "none" if the data is not very compressable (by
117 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
118 * as the starting point for experimenting. "gz" overs slightly better
119 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
120 * decompress, comparing to "lzo".
121 * <li>File system buffering, if the underlying FSDataInputStream and
122 * FSDataOutputStream is already adequately buffered; or if applications
123 * reads/writes keys and values in large buffers, we can reduce the sizes of
124 * input/output buffering in TFile layer by setting the configuration parameters
125 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
126 * </ul>
127 *
128 * Some design rationale behind TFile can be found at <a
129 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
130 */
131 @InterfaceAudience.Public
132 @InterfaceStability.Evolving
133 public class TFile {
134 static final Log LOG = LogFactory.getLog(TFile.class);
135
136 private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
137 private static final String FS_INPUT_BUF_SIZE_ATTR =
138 "tfile.fs.input.buffer.size";
139 private static final String FS_OUTPUT_BUF_SIZE_ATTR =
140 "tfile.fs.output.buffer.size";
141
142 static int getChunkBufferSize(Configuration conf) {
143 int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
144 return (ret > 0) ? ret : 1024 * 1024;
145 }
146
147 static int getFSInputBufferSize(Configuration conf) {
148 return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
149 }
150
151 static int getFSOutputBufferSize(Configuration conf) {
152 return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
153 }
154
155 private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
156 static final Version API_VERSION = new Version((short) 1, (short) 0);
157
158 /** compression: gzip */
159 public static final String COMPRESSION_GZ = "gz";
160 /** compression: lzo */
161 public static final String COMPRESSION_LZO = "lzo";
162 /** compression: none */
163 public static final String COMPRESSION_NONE = "none";
164 /** comparator: memcmp */
165 public static final String COMPARATOR_MEMCMP = "memcmp";
166 /** comparator prefix: java class */
167 public static final String COMPARATOR_JCLASS = "jclass:";
168
169 /**
170 * Make a raw comparator from a string name.
171 *
172 * @param name
173 * Comparator name
174 * @return A RawComparable comparator.
175 */
176 static public Comparator<RawComparable> makeComparator(String name) {
177 return TFileMeta.makeComparator(name);
178 }
179
180 // Prevent the instantiation of TFiles
181 private TFile() {
182 // nothing
183 }
184
185 /**
186 * Get names of supported compression algorithms. The names are acceptable by
187 * TFile.Writer.
188 *
189 * @return Array of strings, each represents a supported compression
190 * algorithm. Currently, the following compression algorithms are
191 * supported.
192 * <ul>
193 * <li>"none" - No compression.
194 * <li>"lzo" - LZO compression.
195 * <li>"gz" - GZIP compression.
196 * </ul>
197 */
198 public static String[] getSupportedCompressionAlgorithms() {
199 return Compression.getSupportedAlgorithms();
200 }
201
202 /**
203 * TFile Writer.
204 */
205 @InterfaceStability.Evolving
206 public static class Writer implements Closeable {
207 // minimum compressed size for a block.
208 private final int sizeMinBlock;
209
210 // Meta blocks.
211 final TFileIndex tfileIndex;
212 final TFileMeta tfileMeta;
213
214 // reference to the underlying BCFile.
215 private BCFile.Writer writerBCF;
216
217 // current data block appender.
218 BlockAppender blkAppender;
219 long blkRecordCount;
220
221 // buffers for caching the key.
222 BoundedByteArrayOutputStream currentKeyBufferOS;
223 BoundedByteArrayOutputStream lastKeyBufferOS;
224
225 // buffer used by chunk codec
226 private byte[] valueBuffer;
227
228 /**
229 * Writer states. The state always transits in circles: READY -> IN_KEY ->
230 * END_KEY -> IN_VALUE -> READY.
231 */
232 private enum State {
233 READY, // Ready to start a new key-value pair insertion.
234 IN_KEY, // In the middle of key insertion.
235 END_KEY, // Key insertion complete, ready to insert value.
236 IN_VALUE, // In value insertion.
237 // ERROR, // Error encountered, cannot continue.
238 CLOSED, // TFile already closed.
239 };
240
241 // current state of Writer.
242 State state = State.READY;
243 Configuration conf;
244 long errorCount = 0;
245
246 /**
247 * Constructor
248 *
249 * @param fsdos
250 * output stream for writing. Must be at position 0.
251 * @param minBlockSize
252 * Minimum compressed block size in bytes. A compression block will
253 * not be closed until it reaches this size except for the last
254 * block.
255 * @param compressName
256 * Name of the compression algorithm. Must be one of the strings
257 * returned by {@link TFile#getSupportedCompressionAlgorithms()}.
258 * @param comparator
259 * Leave comparator as null or empty string if TFile is not sorted.
260 * Otherwise, provide the string name for the comparison algorithm
261 * for keys. Two kinds of comparators are supported.
262 * <ul>
263 * <li>Algorithmic comparator: binary comparators that is language
264 * independent. Currently, only "memcmp" is supported.
265 * <li>Language-specific comparator: binary comparators that can
266 * only be constructed in specific language. For Java, the syntax
267 * is "jclass:", followed by the class name of the RawComparator.
268 * Currently, we only support RawComparators that can be
269 * constructed through the default constructor (with no
270 * parameters). Parameterized RawComparators such as
271 * {@link WritableComparator} or
272 * {@link JavaSerializationComparator} may not be directly used.
273 * One should write a wrapper class that inherits from such classes
274 * and use its default constructor to perform proper
275 * initialization.
276 * </ul>
277 * @param conf
278 * The configuration object.
279 * @throws IOException
280 */
281 public Writer(FSDataOutputStream fsdos, int minBlockSize,
282 String compressName, String comparator, Configuration conf)
283 throws IOException {
284 sizeMinBlock = minBlockSize;
285 tfileMeta = new TFileMeta(comparator);
286 tfileIndex = new TFileIndex(tfileMeta.getComparator());
287
288 writerBCF = new BCFile.Writer(fsdos, compressName, conf);
289 currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
290 lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
291 this.conf = conf;
292 }
293
294 /**
295 * Close the Writer. Resources will be released regardless of the exceptions
296 * being thrown. Future close calls will have no effect.
297 *
298 * The underlying FSDataOutputStream is not closed.
299 */
300 @Override
301 public void close() throws IOException {
302 if ((state == State.CLOSED)) {
303 return;
304 }
305 try {
306 // First try the normal finish.
307 // Terminate upon the first Exception.
308 if (errorCount == 0) {
309 if (state != State.READY) {
310 throw new IllegalStateException(
311 "Cannot close TFile in the middle of key-value insertion.");
312 }
313
314 finishDataBlock(true);
315
316 // first, write out data:TFile.meta
317 BlockAppender outMeta =
318 writerBCF
319 .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
320 try {
321 tfileMeta.write(outMeta);
322 } finally {
323 outMeta.close();
324 }
325
326 // second, write out data:TFile.index
327 BlockAppender outIndex =
328 writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
329 try {
330 tfileIndex.write(outIndex);
331 } finally {
332 outIndex.close();
333 }
334
335 writerBCF.close();
336 }
337 } finally {
338 IOUtils.cleanup(LOG, blkAppender, writerBCF);
339 blkAppender = null;
340 writerBCF = null;
341 state = State.CLOSED;
342 }
343 }
344
345 /**
346 * Adding a new key-value pair to the TFile. This is synonymous to
347 * append(key, 0, key.length, value, 0, value.length)
348 *
349 * @param key
350 * Buffer for key.
351 * @param value
352 * Buffer for value.
353 * @throws IOException
354 */
355 public void append(byte[] key, byte[] value) throws IOException {
356 append(key, 0, key.length, value, 0, value.length);
357 }
358
359 /**
360 * Adding a new key-value pair to TFile.
361 *
362 * @param key
363 * buffer for key.
364 * @param koff
365 * offset in key buffer.
366 * @param klen
367 * length of key.
368 * @param value
369 * buffer for value.
370 * @param voff
371 * offset in value buffer.
372 * @param vlen
373 * length of value.
374 * @throws IOException
375 * Upon IO errors.
376 * <p>
377 * If an exception is thrown, the TFile will be in an inconsistent
378 * state. The only legitimate call after that would be close
379 */
380 public void append(byte[] key, int koff, int klen, byte[] value, int voff,
381 int vlen) throws IOException {
382 if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
383 throw new IndexOutOfBoundsException(
384 "Bad key buffer offset-length combination.");
385 }
386
387 if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
388 throw new IndexOutOfBoundsException(
389 "Bad value buffer offset-length combination.");
390 }
391
392 try {
393 DataOutputStream dosKey = prepareAppendKey(klen);
394 try {
395 ++errorCount;
396 dosKey.write(key, koff, klen);
397 --errorCount;
398 } finally {
399 dosKey.close();
400 }
401
402 DataOutputStream dosValue = prepareAppendValue(vlen);
403 try {
404 ++errorCount;
405 dosValue.write(value, voff, vlen);
406 --errorCount;
407 } finally {
408 dosValue.close();
409 }
410 } finally {
411 state = State.READY;
412 }
413 }
414
415 /**
416 * Helper class to register key after close call on key append stream.
417 */
418 private class KeyRegister extends DataOutputStream {
419 private final int expectedLength;
420 private boolean closed = false;
421
422 public KeyRegister(int len) {
423 super(currentKeyBufferOS);
424 if (len >= 0) {
425 currentKeyBufferOS.reset(len);
426 } else {
427 currentKeyBufferOS.reset();
428 }
429 expectedLength = len;
430 }
431
432 @Override
433 public void close() throws IOException {
434 if (closed == true) {
435 return;
436 }
437
438 try {
439 ++errorCount;
440 byte[] key = currentKeyBufferOS.getBuffer();
441 int len = currentKeyBufferOS.size();
442 /**
443 * verify length.
444 */
445 if (expectedLength >= 0 && expectedLength != len) {
446 throw new IOException("Incorrect key length: expected="
447 + expectedLength + " actual=" + len);
448 }
449
450 Utils.writeVInt(blkAppender, len);
451 blkAppender.write(key, 0, len);
452 if (tfileIndex.getFirstKey() == null) {
453 tfileIndex.setFirstKey(key, 0, len);
454 }
455
456 if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
457 byte[] lastKey = lastKeyBufferOS.getBuffer();
458 int lastLen = lastKeyBufferOS.size();
459 if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
460 lastLen) < 0) {
461 throw new IOException("Keys are not added in sorted order");
462 }
463 }
464
465 BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
466 currentKeyBufferOS = lastKeyBufferOS;
467 lastKeyBufferOS = tmp;
468 --errorCount;
469 } finally {
470 closed = true;
471 state = State.END_KEY;
472 }
473 }
474 }
475
476 /**
477 * Helper class to register value after close call on value append stream.
478 */
479 private class ValueRegister extends DataOutputStream {
480 private boolean closed = false;
481
482 public ValueRegister(OutputStream os) {
483 super(os);
484 }
485
486 // Avoiding flushing call to down stream.
487 @Override
488 public void flush() {
489 // do nothing
490 }
491
492 @Override
493 public void close() throws IOException {
494 if (closed == true) {
495 return;
496 }
497
498 try {
499 ++errorCount;
500 super.close();
501 blkRecordCount++;
502 // bump up the total record count in the whole file
503 tfileMeta.incRecordCount();
504 finishDataBlock(false);
505 --errorCount;
506 } finally {
507 closed = true;
508 state = State.READY;
509 }
510 }
511 }
512
513 /**
514 * Obtain an output stream for writing a key into TFile. This may only be
515 * called when there is no active Key appending stream or value appending
516 * stream.
517 *
518 * @param length
519 * The expected length of the key. If length of the key is not
520 * known, set length = -1. Otherwise, the application must write
521 * exactly as many bytes as specified here before calling close on
522 * the returned output stream.
523 * @return The key appending output stream.
524 * @throws IOException
525 *
526 */
527 public DataOutputStream prepareAppendKey(int length) throws IOException {
528 if (state != State.READY) {
529 throw new IllegalStateException("Incorrect state to start a new key: "
530 + state.name());
531 }
532
533 initDataBlock();
534 DataOutputStream ret = new KeyRegister(length);
535 state = State.IN_KEY;
536 return ret;
537 }
538
539 /**
540 * Obtain an output stream for writing a value into TFile. This may only be
541 * called right after a key appending operation (the key append stream must
542 * be closed).
543 *
544 * @param length
545 * The expected length of the value. If length of the value is not
546 * known, set length = -1. Otherwise, the application must write
547 * exactly as many bytes as specified here before calling close on
548 * the returned output stream. Advertising the value size up-front
549 * guarantees that the value is encoded in one chunk, and avoids
550 * intermediate chunk buffering.
551 * @throws IOException
552 *
553 */
554 public DataOutputStream prepareAppendValue(int length) throws IOException {
555 if (state != State.END_KEY) {
556 throw new IllegalStateException(
557 "Incorrect state to start a new value: " + state.name());
558 }
559
560 DataOutputStream ret;
561
562 // unknown length
563 if (length < 0) {
564 if (valueBuffer == null) {
565 valueBuffer = new byte[getChunkBufferSize(conf)];
566 }
567 ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
568 } else {
569 ret =
570 new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
571 }
572
573 state = State.IN_VALUE;
574 return ret;
575 }
576
577 /**
578 * Obtain an output stream for creating a meta block. This function may not
579 * be called when there is a key append stream or value append stream
580 * active. No more key-value insertion is allowed after a meta data block
581 * has been added to TFile.
582 *
583 * @param name
584 * Name of the meta block.
585 * @param compressName
586 * Name of the compression algorithm to be used. Must be one of the
587 * strings returned by
588 * {@link TFile#getSupportedCompressionAlgorithms()}.
589 * @return A DataOutputStream that can be used to write Meta Block data.
590 * Closing the stream would signal the ending of the block.
591 * @throws IOException
592 * @throws MetaBlockAlreadyExists
593 * the Meta Block with the same name already exists.
594 */
595 public DataOutputStream prepareMetaBlock(String name, String compressName)
596 throws IOException, MetaBlockAlreadyExists {
597 if (state != State.READY) {
598 throw new IllegalStateException(
599 "Incorrect state to start a Meta Block: " + state.name());
600 }
601
602 finishDataBlock(true);
603 DataOutputStream outputStream =
604 writerBCF.prepareMetaBlock(name, compressName);
605 return outputStream;
606 }
607
608 /**
609 * Obtain an output stream for creating a meta block. This function may not
610 * be called when there is a key append stream or value append stream
611 * active. No more key-value insertion is allowed after a meta data block
612 * has been added to TFile. Data will be compressed using the default
613 * compressor as defined in Writer's constructor.
614 *
615 * @param name
616 * Name of the meta block.
617 * @return A DataOutputStream that can be used to write Meta Block data.
618 * Closing the stream would signal the ending of the block.
619 * @throws IOException
620 * @throws MetaBlockAlreadyExists
621 * the Meta Block with the same name already exists.
622 */
623 public DataOutputStream prepareMetaBlock(String name) throws IOException,
624 MetaBlockAlreadyExists {
625 if (state != State.READY) {
626 throw new IllegalStateException(
627 "Incorrect state to start a Meta Block: " + state.name());
628 }
629
630 finishDataBlock(true);
631 return writerBCF.prepareMetaBlock(name);
632 }
633
634 /**
635 * Check if we need to start a new data block.
636 *
637 * @throws IOException
638 */
639 private void initDataBlock() throws IOException {
640 // for each new block, get a new appender
641 if (blkAppender == null) {
642 blkAppender = writerBCF.prepareDataBlock();
643 }
644 }
645
646 /**
647 * Close the current data block if necessary.
648 *
649 * @param bForceFinish
650 * Force the closure regardless of the block size.
651 * @throws IOException
652 */
653 void finishDataBlock(boolean bForceFinish) throws IOException {
654 if (blkAppender == null) {
655 return;
656 }
657
658 // exceeded the size limit, do the compression and finish the block
659 if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
660 // keep tracks of the last key of each data block, no padding
661 // for now
662 TFileIndexEntry keyLast =
663 new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
664 .size(), blkRecordCount);
665 tfileIndex.addEntry(keyLast);
666 // close the appender
667 blkAppender.close();
668 blkAppender = null;
669 blkRecordCount = 0;
670 }
671 }
672 }
673
674 /**
675 * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
676 * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
677 * ) , a portion of TFile based on byte offsets (
678 * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
679 * fall in a certain key range (for sorted TFile only,
680 * {@link Reader#createScannerByKey(byte[], byte[])} or
681 * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
682 */
683 @InterfaceStability.Evolving
684 public static class Reader implements Closeable {
685 // The underlying BCFile reader.
686 final BCFile.Reader readerBCF;
687
688 // TFile index, it is loaded lazily.
689 TFileIndex tfileIndex = null;
690 final TFileMeta tfileMeta;
691 final BytesComparator comparator;
692
693 // global begin and end locations.
694 private final Location begin;
695 private final Location end;
696
697 /**
698 * Location representing a virtual position in the TFile.
699 */
700 static final class Location implements Comparable<Location>, Cloneable {
701 private int blockIndex;
702 // distance/offset from the beginning of the block
703 private long recordIndex;
704
705 Location(int blockIndex, long recordIndex) {
706 set(blockIndex, recordIndex);
707 }
708
709 void incRecordIndex() {
710 ++recordIndex;
711 }
712
713 Location(Location other) {
714 set(other);
715 }
716
717 int getBlockIndex() {
718 return blockIndex;
719 }
720
721 long getRecordIndex() {
722 return recordIndex;
723 }
724
725 void set(int blockIndex, long recordIndex) {
726 if ((blockIndex | recordIndex) < 0) {
727 throw new IllegalArgumentException(
728 "Illegal parameter for BlockLocation.");
729 }
730 this.blockIndex = blockIndex;
731 this.recordIndex = recordIndex;
732 }
733
734 void set(Location other) {
735 set(other.blockIndex, other.recordIndex);
736 }
737
738 /**
739 * @see java.lang.Comparable#compareTo(java.lang.Object)
740 */
741 @Override
742 public int compareTo(Location other) {
743 return compareTo(other.blockIndex, other.recordIndex);
744 }
745
746 int compareTo(int bid, long rid) {
747 if (this.blockIndex == bid) {
748 long ret = this.recordIndex - rid;
749 if (ret > 0) return 1;
750 if (ret < 0) return -1;
751 return 0;
752 }
753 return this.blockIndex - bid;
754 }
755
756 /**
757 * @see java.lang.Object#clone()
758 */
759 @Override
760 protected Location clone() {
761 return new Location(blockIndex, recordIndex);
762 }
763
764 /**
765 * @see java.lang.Object#hashCode()
766 */
767 @Override
768 public int hashCode() {
769 final int prime = 31;
770 int result = prime + blockIndex;
771 result = (int) (prime * result + recordIndex);
772 return result;
773 }
774
775 /**
776 * @see java.lang.Object#equals(java.lang.Object)
777 */
778 @Override
779 public boolean equals(Object obj) {
780 if (this == obj) return true;
781 if (obj == null) return false;
782 if (getClass() != obj.getClass()) return false;
783 Location other = (Location) obj;
784 if (blockIndex != other.blockIndex) return false;
785 if (recordIndex != other.recordIndex) return false;
786 return true;
787 }
788 }
789
790 /**
791 * Constructor
792 *
793 * @param fsdis
794 * FS input stream of the TFile.
795 * @param fileLength
796 * The length of TFile. This is required because we have no easy
797 * way of knowing the actual size of the input file through the
798 * File input stream.
799 * @param conf
800 * @throws IOException
801 */
802 public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
803 throws IOException {
804 readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
805
806 // first, read TFile meta
807 BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
808 try {
809 tfileMeta = new TFileMeta(brMeta);
810 } finally {
811 brMeta.close();
812 }
813
814 comparator = tfileMeta.getComparator();
815 // Set begin and end locations.
816 begin = new Location(0, 0);
817 end = new Location(readerBCF.getBlockCount(), 0);
818 }
819
820 /**
821 * Close the reader. The state of the Reader object is undefined after
822 * close. Calling close() for multiple times has no effect.
823 */
824 @Override
825 public void close() throws IOException {
826 readerBCF.close();
827 }
828
829 /**
830 * Get the begin location of the TFile.
831 *
832 * @return If TFile is not empty, the location of the first key-value pair.
833 * Otherwise, it returns end().
834 */
835 Location begin() {
836 return begin;
837 }
838
839 /**
840 * Get the end location of the TFile.
841 *
842 * @return The location right after the last key-value pair in TFile.
843 */
844 Location end() {
845 return end;
846 }
847
848 /**
849 * Get the string representation of the comparator.
850 *
851 * @return If the TFile is not sorted by keys, an empty string will be
852 * returned. Otherwise, the actual comparator string that is
853 * provided during the TFile creation time will be returned.
854 */
855 public String getComparatorName() {
856 return tfileMeta.getComparatorString();
857 }
858
859 /**
860 * Is the TFile sorted?
861 *
862 * @return true if TFile is sorted.
863 */
864 public boolean isSorted() {
865 return tfileMeta.isSorted();
866 }
867
868 /**
869 * Get the number of key-value pair entries in TFile.
870 *
871 * @return the number of key-value pairs in TFile
872 */
873 public long getEntryCount() {
874 return tfileMeta.getRecordCount();
875 }
876
877 /**
878 * Lazily loading the TFile index.
879 *
880 * @throws IOException
881 */
882 synchronized void checkTFileDataIndex() throws IOException {
883 if (tfileIndex == null) {
884 BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
885 try {
886 tfileIndex =
887 new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
888 .getComparator());
889 } finally {
890 brIndex.close();
891 }
892 }
893 }
894
895 /**
896 * Get the first key in the TFile.
897 *
898 * @return The first key in the TFile.
899 * @throws IOException
900 */
901 public RawComparable getFirstKey() throws IOException {
902 checkTFileDataIndex();
903 return tfileIndex.getFirstKey();
904 }
905
906 /**
907 * Get the last key in the TFile.
908 *
909 * @return The last key in the TFile.
910 * @throws IOException
911 */
912 public RawComparable getLastKey() throws IOException {
913 checkTFileDataIndex();
914 return tfileIndex.getLastKey();
915 }
916
917 /**
918 * Get a Comparator object to compare Entries. It is useful when you want
919 * stores the entries in a collection (such as PriorityQueue) and perform
920 * sorting or comparison among entries based on the keys without copying out
921 * the key.
922 *
923 * @return An Entry Comparator..
924 */
925 public Comparator<Scanner.Entry> getEntryComparator() {
926 if (!isSorted()) {
927 throw new RuntimeException(
928 "Entries are not comparable for unsorted TFiles");
929 }
930
931 return new Comparator<Scanner.Entry>() {
932 /**
933 * Provide a customized comparator for Entries. This is useful if we
934 * have a collection of Entry objects. However, if the Entry objects
935 * come from different TFiles, users must ensure that those TFiles share
936 * the same RawComparator.
937 */
938 @Override
939 public int compare(Scanner.Entry o1, Scanner.Entry o2) {
940 return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
941 .getKeyBuffer(), 0, o2.getKeyLength());
942 }
943 };
944 }
945
946 /**
947 * Get an instance of the RawComparator that is constructed based on the
948 * string comparator representation.
949 *
950 * @return a Comparator that can compare RawComparable's.
951 */
952 public Comparator<RawComparable> getComparator() {
953 return comparator;
954 }
955
956 /**
957 * Stream access to a meta block.``
958 *
959 * @param name
960 * The name of the meta block.
961 * @return The input stream.
962 * @throws IOException
963 * on I/O error.
964 * @throws MetaBlockDoesNotExist
965 * If the meta block with the name does not exist.
966 */
967 public DataInputStream getMetaBlock(String name) throws IOException,
968 MetaBlockDoesNotExist {
969 return readerBCF.getMetaBlock(name);
970 }
971
972 /**
973 * if greater is true then returns the beginning location of the block
974 * containing the key strictly greater than input key. if greater is false
975 * then returns the beginning location of the block greater than equal to
976 * the input key
977 *
978 * @param key
979 * the input key
980 * @param greater
981 * boolean flag
982 * @return
983 * @throws IOException
984 */
985 Location getBlockContainsKey(RawComparable key, boolean greater)
986 throws IOException {
987 if (!isSorted()) {
988 throw new RuntimeException("Seeking in unsorted TFile");
989 }
990 checkTFileDataIndex();
991 int blkIndex =
992 (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
993 if (blkIndex < 0) return end;
994 return new Location(blkIndex, 0);
995 }
996
997 Location getLocationByRecordNum(long recNum) throws IOException {
998 checkTFileDataIndex();
999 return tfileIndex.getLocationByRecordNum(recNum);
1000 }
1001
1002 long getRecordNumByLocation(Location location) throws IOException {
1003 checkTFileDataIndex();
1004 return tfileIndex.getRecordNumByLocation(location);
1005 }
1006
1007 int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
1008 if (!isSorted()) {
1009 throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1010 }
1011 return comparator.compare(a, o1, l1, b, o2, l2);
1012 }
1013
1014 int compareKeys(RawComparable a, RawComparable b) {
1015 if (!isSorted()) {
1016 throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1017 }
1018 return comparator.compare(a, b);
1019 }
1020
1021 /**
1022 * Get the location pointing to the beginning of the first key-value pair in
1023 * a compressed block whose byte offset in the TFile is greater than or
1024 * equal to the specified offset.
1025 *
1026 * @param offset
1027 * the user supplied offset.
1028 * @return the location to the corresponding entry; or end() if no such
1029 * entry exists.
1030 */
1031 Location getLocationNear(long offset) {
1032 int blockIndex = readerBCF.getBlockIndexNear(offset);
1033 if (blockIndex == -1) return end;
1034 return new Location(blockIndex, 0);
1035 }
1036
1037 /**
1038 * Get the RecordNum for the first key-value pair in a compressed block
1039 * whose byte offset in the TFile is greater than or equal to the specified
1040 * offset.
1041 *
1042 * @param offset
1043 * the user supplied offset.
1044 * @return the RecordNum to the corresponding entry. If no such entry
1045 * exists, it returns the total entry count.
1046 * @throws IOException
1047 */
1048 public long getRecordNumNear(long offset) throws IOException {
1049 return getRecordNumByLocation(getLocationNear(offset));
1050 }
1051
1052 /**
1053 * Get a sample key that is within a block whose starting offset is greater
1054 * than or equal to the specified offset.
1055 *
1056 * @param offset
1057 * The file offset.
1058 * @return the key that fits the requirement; or null if no such key exists
1059 * (which could happen if the offset is close to the end of the
1060 * TFile).
1061 * @throws IOException
1062 */
1063 public RawComparable getKeyNear(long offset) throws IOException {
1064 int blockIndex = readerBCF.getBlockIndexNear(offset);
1065 if (blockIndex == -1) return null;
1066 checkTFileDataIndex();
1067 return new ByteArray(tfileIndex.getEntry(blockIndex).key);
1068 }
1069
1070 /**
1071 * Get a scanner than can scan the whole TFile.
1072 *
1073 * @return The scanner object. A valid Scanner is always returned even if
1074 * the TFile is empty.
1075 * @throws IOException
1076 */
1077 public Scanner createScanner() throws IOException {
1078 return new Scanner(this, begin, end);
1079 }
1080
1081 /**
1082 * Get a scanner that covers a portion of TFile based on byte offsets.
1083 *
1084 * @param offset
1085 * The beginning byte offset in the TFile.
1086 * @param length
1087 * The length of the region.
1088 * @return The actual coverage of the returned scanner tries to match the
1089 * specified byte-region but always round up to the compression
1090 * block boundaries. It is possible that the returned scanner
1091 * contains zero key-value pairs even if length is positive.
1092 * @throws IOException
1093 */
1094 public Scanner createScannerByByteRange(long offset, long length) throws IOException {
1095 return new Scanner(this, offset, offset + length);
1096 }
1097
1098 /**
1099 * Get a scanner that covers a portion of TFile based on keys.
1100 *
1101 * @param beginKey
1102 * Begin key of the scan (inclusive). If null, scan from the first
1103 * key-value entry of the TFile.
1104 * @param endKey
1105 * End key of the scan (exclusive). If null, scan up to the last
1106 * key-value entry of the TFile.
1107 * @return The actual coverage of the returned scanner will cover all keys
1108 * greater than or equal to the beginKey and less than the endKey.
1109 * @throws IOException
1110 *
1111 * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
1112 */
1113 @Deprecated
1114 public Scanner createScanner(byte[] beginKey, byte[] endKey)
1115 throws IOException {
1116 return createScannerByKey(beginKey, endKey);
1117 }
1118
1119 /**
1120 * Get a scanner that covers a portion of TFile based on keys.
1121 *
1122 * @param beginKey
1123 * Begin key of the scan (inclusive). If null, scan from the first
1124 * key-value entry of the TFile.
1125 * @param endKey
1126 * End key of the scan (exclusive). If null, scan up to the last
1127 * key-value entry of the TFile.
1128 * @return The actual coverage of the returned scanner will cover all keys
1129 * greater than or equal to the beginKey and less than the endKey.
1130 * @throws IOException
1131 */
1132 public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
1133 throws IOException {
1134 return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
1135 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
1136 0, endKey.length));
1137 }
1138
1139 /**
1140 * Get a scanner that covers a specific key range.
1141 *
1142 * @param beginKey
1143 * Begin key of the scan (inclusive). If null, scan from the first
1144 * key-value entry of the TFile.
1145 * @param endKey
1146 * End key of the scan (exclusive). If null, scan up to the last
1147 * key-value entry of the TFile.
1148 * @return The actual coverage of the returned scanner will cover all keys
1149 * greater than or equal to the beginKey and less than the endKey.
1150 * @throws IOException
1151 *
1152 * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
1153 * instead.
1154 */
1155 @Deprecated
1156 public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
1157 throws IOException {
1158 return createScannerByKey(beginKey, endKey);
1159 }
1160
1161 /**
1162 * Get a scanner that covers a specific key range.
1163 *
1164 * @param beginKey
1165 * Begin key of the scan (inclusive). If null, scan from the first
1166 * key-value entry of the TFile.
1167 * @param endKey
1168 * End key of the scan (exclusive). If null, scan up to the last
1169 * key-value entry of the TFile.
1170 * @return The actual coverage of the returned scanner will cover all keys
1171 * greater than or equal to the beginKey and less than the endKey.
1172 * @throws IOException
1173 */
1174 public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
1175 throws IOException {
1176 if ((beginKey != null) && (endKey != null)
1177 && (compareKeys(beginKey, endKey) >= 0)) {
1178 return new Scanner(this, beginKey, beginKey);
1179 }
1180 return new Scanner(this, beginKey, endKey);
1181 }
1182
1183 /**
1184 * Create a scanner that covers a range of records.
1185 *
1186 * @param beginRecNum
1187 * The RecordNum for the first record (inclusive).
1188 * @param endRecNum
1189 * The RecordNum for the last record (exclusive). To scan the whole
1190 * file, either specify endRecNum==-1 or endRecNum==getEntryCount().
1191 * @return The TFile scanner that covers the specified range of records.
1192 * @throws IOException
1193 */
1194 public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
1195 throws IOException {
1196 if (beginRecNum < 0) beginRecNum = 0;
1197 if (endRecNum < 0 || endRecNum > getEntryCount()) {
1198 endRecNum = getEntryCount();
1199 }
1200 return new Scanner(this, getLocationByRecordNum(beginRecNum),
1201 getLocationByRecordNum(endRecNum));
1202 }
1203
1204 /**
1205 * The TFile Scanner. The Scanner has an implicit cursor, which, upon
1206 * creation, points to the first key-value pair in the scan range. If the
1207 * scan range is empty, the cursor will point to the end of the scan range.
1208 * <p>
1209 * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
1210 * location of the scanner.
1211 * <p>
1212 * Use {@link Scanner#advance()} to move the cursor to the next key-value
1213 * pair (or end if none exists). Use seekTo methods (
1214 * {@link Scanner#seekTo(byte[])} or
1215 * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
1216 * location in the covered range (including backward seeking). Use
1217 * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
1218 * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
1219 * <p>
1220 * Actual keys and values may be obtained through {@link Scanner.Entry}
1221 * object, which is obtained through {@link Scanner#entry()}.
1222 */
1223 public static class Scanner implements Closeable {
1224 // The underlying TFile reader.
1225 final Reader reader;
1226 // current block (null if reaching end)
1227 private BlockReader blkReader;
1228
1229 Location beginLocation;
1230 Location endLocation;
1231 Location currentLocation;
1232
1233 // flag to ensure value is only examined once.
1234 boolean valueChecked = false;
1235 // reusable buffer for keys.
1236 final byte[] keyBuffer;
1237 // length of key, -1 means key is invalid.
1238 int klen = -1;
1239
1240 static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
1241 BytesWritable valTransferBuffer;
1242
1243 DataInputBuffer keyDataInputStream;
1244 ChunkDecoder valueBufferInputStream;
1245 DataInputStream valueDataInputStream;
1246 // vlen == -1 if unknown.
1247 int vlen;
1248
1249 /**
1250 * Constructor
1251 *
1252 * @param reader
1253 * The TFile reader object.
1254 * @param offBegin
1255 * Begin byte-offset of the scan.
1256 * @param offEnd
1257 * End byte-offset of the scan.
1258 * @throws IOException
1259 *
1260 * The offsets will be rounded to the beginning of a compressed
1261 * block whose offset is greater than or equal to the specified
1262 * offset.
1263 */
1264 protected Scanner(Reader reader, long offBegin, long offEnd)
1265 throws IOException {
1266 this(reader, reader.getLocationNear(offBegin), reader
1267 .getLocationNear(offEnd));
1268 }
1269
1270 /**
1271 * Constructor
1272 *
1273 * @param reader
1274 * The TFile reader object.
1275 * @param begin
1276 * Begin location of the scan.
1277 * @param end
1278 * End location of the scan.
1279 * @throws IOException
1280 */
1281 Scanner(Reader reader, Location begin, Location end) throws IOException {
1282 this.reader = reader;
1283 // ensure the TFile index is loaded throughout the life of scanner.
1284 reader.checkTFileDataIndex();
1285 beginLocation = begin;
1286 endLocation = end;
1287
1288 valTransferBuffer = new BytesWritable();
1289 // TODO: remember the longest key in a TFile, and use it to replace
1290 // MAX_KEY_SIZE.
1291 keyBuffer = new byte[MAX_KEY_SIZE];
1292 keyDataInputStream = new DataInputBuffer();
1293 valueBufferInputStream = new ChunkDecoder();
1294 valueDataInputStream = new DataInputStream(valueBufferInputStream);
1295
1296 if (beginLocation.compareTo(endLocation) >= 0) {
1297 currentLocation = new Location(endLocation);
1298 } else {
1299 currentLocation = new Location(0, 0);
1300 initBlock(beginLocation.getBlockIndex());
1301 inBlockAdvance(beginLocation.getRecordIndex());
1302 }
1303 }
1304
1305 /**
1306 * Constructor
1307 *
1308 * @param reader
1309 * The TFile reader object.
1310 * @param beginKey
1311 * Begin key of the scan. If null, scan from the first <K,V>
1312 * entry of the TFile.
1313 * @param endKey
1314 * End key of the scan. If null, scan up to the last <K, V> entry
1315 * of the TFile.
1316 * @throws IOException
1317 */
1318 protected Scanner(Reader reader, RawComparable beginKey,
1319 RawComparable endKey) throws IOException {
1320 this(reader, (beginKey == null) ? reader.begin() : reader
1321 .getBlockContainsKey(beginKey, false), reader.end());
1322 if (beginKey != null) {
1323 inBlockAdvance(beginKey, false);
1324 beginLocation.set(currentLocation);
1325 }
1326 if (endKey != null) {
1327 seekTo(endKey, false);
1328 endLocation.set(currentLocation);
1329 seekTo(beginLocation);
1330 }
1331 }
1332
1333 /**
1334 * Move the cursor to the first entry whose key is greater than or equal
1335 * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
1336 * returned by the previous entry() call will be invalid.
1337 *
1338 * @param key
1339 * The input key
1340 * @return true if we find an equal key.
1341 * @throws IOException
1342 */
1343 public boolean seekTo(byte[] key) throws IOException {
1344 return seekTo(key, 0, key.length);
1345 }
1346
1347 /**
1348 * Move the cursor to the first entry whose key is greater than or equal
1349 * to the input key. The entry returned by the previous entry() call will
1350 * be invalid.
1351 *
1352 * @param key
1353 * The input key
1354 * @param keyOffset
1355 * offset in the key buffer.
1356 * @param keyLen
1357 * key buffer length.
1358 * @return true if we find an equal key; false otherwise.
1359 * @throws IOException
1360 */
1361 public boolean seekTo(byte[] key, int keyOffset, int keyLen)
1362 throws IOException {
1363 return seekTo(new ByteArray(key, keyOffset, keyLen), false);
1364 }
1365
1366 private boolean seekTo(RawComparable key, boolean beyond)
1367 throws IOException {
1368 Location l = reader.getBlockContainsKey(key, beyond);
1369 if (l.compareTo(beginLocation) < 0) {
1370 l = beginLocation;
1371 } else if (l.compareTo(endLocation) >= 0) {
1372 seekTo(endLocation);
1373 return false;
1374 }
1375
1376 // check if what we are seeking is in the later part of the current
1377 // block.
1378 if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
1379 || (compareCursorKeyTo(key) >= 0)) {
1380 // sorry, we must seek to a different location first.
1381 seekTo(l);
1382 }
1383
1384 return inBlockAdvance(key, beyond);
1385 }
1386
1387 /**
1388 * Move the cursor to the new location. The entry returned by the previous
1389 * entry() call will be invalid.
1390 *
1391 * @param l
1392 * new cursor location. It must fall between the begin and end
1393 * location of the scanner.
1394 * @throws IOException
1395 */
1396 private void seekTo(Location l) throws IOException {
1397 if (l.compareTo(beginLocation) < 0) {
1398 throw new IllegalArgumentException(
1399 "Attempt to seek before the begin location.");
1400 }
1401
1402 if (l.compareTo(endLocation) > 0) {
1403 throw new IllegalArgumentException(
1404 "Attempt to seek after the end location.");
1405 }
1406
1407 if (l.compareTo(endLocation) == 0) {
1408 parkCursorAtEnd();
1409 return;
1410 }
1411
1412 if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
1413 // going to a totally different block
1414 initBlock(l.getBlockIndex());
1415 } else {
1416 if (valueChecked) {
1417 // may temporarily go beyond the last record in the block (in which
1418 // case the next if loop will always be true).
1419 inBlockAdvance(1);
1420 }
1421 if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
1422 initBlock(l.getBlockIndex());
1423 }
1424 }
1425
1426 inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
1427
1428 return;
1429 }
1430
1431 /**
1432 * Rewind to the first entry in the scanner. The entry returned by the
1433 * previous entry() call will be invalid.
1434 *
1435 * @throws IOException
1436 */
1437 public void rewind() throws IOException {
1438 seekTo(beginLocation);
1439 }
1440
1441 /**
1442 * Seek to the end of the scanner. The entry returned by the previous
1443 * entry() call will be invalid.
1444 *
1445 * @throws IOException
1446 */
1447 public void seekToEnd() throws IOException {
1448 parkCursorAtEnd();
1449 }
1450
1451 /**
1452 * Move the cursor to the first entry whose key is greater than or equal
1453 * to the input key. Synonymous to lowerBound(key, 0, key.length). The
1454 * entry returned by the previous entry() call will be invalid.
1455 *
1456 * @param key
1457 * The input key
1458 * @throws IOException
1459 */
1460 public void lowerBound(byte[] key) throws IOException {
1461 lowerBound(key, 0, key.length);
1462 }
1463
1464 /**
1465 * Move the cursor to the first entry whose key is greater than or equal
1466 * to the input key. The entry returned by the previous entry() call will
1467 * be invalid.
1468 *
1469 * @param key
1470 * The input key
1471 * @param keyOffset
1472 * offset in the key buffer.
1473 * @param keyLen
1474 * key buffer length.
1475 * @throws IOException
1476 */
1477 public void lowerBound(byte[] key, int keyOffset, int keyLen)
1478 throws IOException {
1479 seekTo(new ByteArray(key, keyOffset, keyLen), false);
1480 }
1481
1482 /**
1483 * Move the cursor to the first entry whose key is strictly greater than
1484 * the input key. Synonymous to upperBound(key, 0, key.length). The entry
1485 * returned by the previous entry() call will be invalid.
1486 *
1487 * @param key
1488 * The input key
1489 * @throws IOException
1490 */
1491 public void upperBound(byte[] key) throws IOException {
1492 upperBound(key, 0, key.length);
1493 }
1494
1495 /**
1496 * Move the cursor to the first entry whose key is strictly greater than
1497 * the input key. The entry returned by the previous entry() call will be
1498 * invalid.
1499 *
1500 * @param key
1501 * The input key
1502 * @param keyOffset
1503 * offset in the key buffer.
1504 * @param keyLen
1505 * key buffer length.
1506 * @throws IOException
1507 */
1508 public void upperBound(byte[] key, int keyOffset, int keyLen)
1509 throws IOException {
1510 seekTo(new ByteArray(key, keyOffset, keyLen), true);
1511 }
1512
1513 /**
1514 * Move the cursor to the next key-value pair. The entry returned by the
1515 * previous entry() call will be invalid.
1516 *
1517 * @return true if the cursor successfully moves. False when cursor is
1518 * already at the end location and cannot be advanced.
1519 * @throws IOException
1520 */
1521 public boolean advance() throws IOException {
1522 if (atEnd()) {
1523 return false;
1524 }
1525
1526 int curBid = currentLocation.getBlockIndex();
1527 long curRid = currentLocation.getRecordIndex();
1528 long entriesInBlock = reader.getBlockEntryCount(curBid);
1529 if (curRid + 1 >= entriesInBlock) {
1530 if (endLocation.compareTo(curBid + 1, 0) <= 0) {
1531 // last entry in TFile.
1532 parkCursorAtEnd();
1533 } else {
1534 // last entry in Block.
1535 initBlock(curBid + 1);
1536 }
1537 } else {
1538 inBlockAdvance(1);
1539 }
1540 return true;
1541 }
1542
1543 /**
1544 * Load a compressed block for reading. Expecting blockIndex is valid.
1545 *
1546 * @throws IOException
1547 */
1548 private void initBlock(int blockIndex) throws IOException {
1549 klen = -1;
1550 if (blkReader != null) {
1551 try {
1552 blkReader.close();
1553 } finally {
1554 blkReader = null;
1555 }
1556 }
1557 blkReader = reader.getBlockReader(blockIndex);
1558 currentLocation.set(blockIndex, 0);
1559 }
1560
1561 private void parkCursorAtEnd() throws IOException {
1562 klen = -1;
1563 currentLocation.set(endLocation);
1564 if (blkReader != null) {
1565 try {
1566 blkReader.close();
1567 } finally {
1568 blkReader = null;
1569 }
1570 }
1571 }
1572
1573 /**
1574 * Close the scanner. Release all resources. The behavior of using the
1575 * scanner after calling close is not defined. The entry returned by the
1576 * previous entry() call will be invalid.
1577 */
1578 @Override
1579 public void close() throws IOException {
1580 parkCursorAtEnd();
1581 }
1582
1583 /**
1584 * Is cursor at the end location?
1585 *
1586 * @return true if the cursor is at the end location.
1587 */
1588 public boolean atEnd() {
1589 return (currentLocation.compareTo(endLocation) >= 0);
1590 }
1591
1592 /**
1593 * check whether we have already successfully obtained the key. It also
1594 * initializes the valueInputStream.
1595 */
1596 void checkKey() throws IOException {
1597 if (klen >= 0) return;
1598 if (atEnd()) {
1599 throw new EOFException("No key-value to read");
1600 }
1601 klen = -1;
1602 vlen = -1;
1603 valueChecked = false;
1604
1605 klen = Utils.readVInt(blkReader);
1606 blkReader.readFully(keyBuffer, 0, klen);
1607 valueBufferInputStream.reset(blkReader);
1608 if (valueBufferInputStream.isLastChunk()) {
1609 vlen = valueBufferInputStream.getRemain();
1610 }
1611 }
1612
1613 /**
1614 * Get an entry to access the key and value.
1615 *
1616 * @return The Entry object to access the key and value.
1617 * @throws IOException
1618 */
1619 public Entry entry() throws IOException {
1620 checkKey();
1621 return new Entry();
1622 }
1623
1624 /**
1625 * Get the RecordNum corresponding to the entry pointed by the cursor.
1626 * @return The RecordNum corresponding to the entry pointed by the cursor.
1627 * @throws IOException
1628 */
1629 public long getRecordNum() throws IOException {
1630 return reader.getRecordNumByLocation(currentLocation);
1631 }
1632
1633 /**
1634 * Internal API. Comparing the key at cursor to user-specified key.
1635 *
1636 * @param other
1637 * user-specified key.
1638 * @return negative if key at cursor is smaller than user key; 0 if equal;
1639 * and positive if key at cursor greater than user key.
1640 * @throws IOException
1641 */
1642 int compareCursorKeyTo(RawComparable other) throws IOException {
1643 checkKey();
1644 return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
1645 .offset(), other.size());
1646 }
1647
1648 /**
1649 * Entry to a <Key, Value> pair.
1650 */
1651 public class Entry implements Comparable<RawComparable> {
1652 /**
1653 * Get the length of the key.
1654 *
1655 * @return the length of the key.
1656 */
1657 public int getKeyLength() {
1658 return klen;
1659 }
1660
1661 byte[] getKeyBuffer() {
1662 return keyBuffer;
1663 }
1664
1665 /**
1666 * Copy the key and value in one shot into BytesWritables. This is
1667 * equivalent to getKey(key); getValue(value);
1668 *
1669 * @param key
1670 * BytesWritable to hold key.
1671 * @param value
1672 * BytesWritable to hold value
1673 * @throws IOException
1674 */
1675 public void get(BytesWritable key, BytesWritable value)
1676 throws IOException {
1677 getKey(key);
1678 getValue(value);
1679 }
1680
1681 /**
1682 * Copy the key into BytesWritable. The input BytesWritable will be
1683 * automatically resized to the actual key size.
1684 *
1685 * @param key
1686 * BytesWritable to hold the key.
1687 * @throws IOException
1688 */
1689 public int getKey(BytesWritable key) throws IOException {
1690 key.setSize(getKeyLength());
1691 getKey(key.getBytes());
1692 return key.getLength();
1693 }
1694
1695 /**
1696 * Copy the value into BytesWritable. The input BytesWritable will be
1697 * automatically resized to the actual value size. The implementation
1698 * directly uses the buffer inside BytesWritable for storing the value.
1699 * The call does not require the value length to be known.
1700 *
1701 * @param value
1702 * @throws IOException
1703 */
1704 public long getValue(BytesWritable value) throws IOException {
1705 DataInputStream dis = getValueStream();
1706 int size = 0;
1707 try {
1708 int remain;
1709 while ((remain = valueBufferInputStream.getRemain()) > 0) {
1710 value.setSize(size + remain);
1711 dis.readFully(value.getBytes(), size, remain);
1712 size += remain;
1713 }
1714 return value.getLength();
1715 } finally {
1716 dis.close();
1717 }
1718 }
1719
1720 /**
1721 * Writing the key to the output stream. This method avoids copying key
1722 * buffer from Scanner into user buffer, then writing to the output
1723 * stream.
1724 *
1725 * @param out
1726 * The output stream
1727 * @return the length of the key.
1728 * @throws IOException
1729 */
1730 public int writeKey(OutputStream out) throws IOException {
1731 out.write(keyBuffer, 0, klen);
1732 return klen;
1733 }
1734
1735 /**
1736 * Writing the value to the output stream. This method avoids copying
1737 * value data from Scanner into user buffer, then writing to the output
1738 * stream. It does not require the value length to be known.
1739 *
1740 * @param out
1741 * The output stream
1742 * @return the length of the value
1743 * @throws IOException
1744 */
1745 public long writeValue(OutputStream out) throws IOException {
1746 DataInputStream dis = getValueStream();
1747 long size = 0;
1748 try {
1749 int chunkSize;
1750 while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
1751 chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
1752 valTransferBuffer.setSize(chunkSize);
1753 dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
1754 out.write(valTransferBuffer.getBytes(), 0, chunkSize);
1755 size += chunkSize;
1756 }
1757 return size;
1758 } finally {
1759 dis.close();
1760 }
1761 }
1762
1763 /**
1764 * Copy the key into user supplied buffer.
1765 *
1766 * @param buf
1767 * The buffer supplied by user. The length of the buffer must
1768 * not be shorter than the key length.
1769 * @return The length of the key.
1770 *
1771 * @throws IOException
1772 */
1773 public int getKey(byte[] buf) throws IOException {
1774 return getKey(buf, 0);
1775 }
1776
1777 /**
1778 * Copy the key into user supplied buffer.
1779 *
1780 * @param buf
1781 * The buffer supplied by user.
1782 * @param offset
1783 * The starting offset of the user buffer where we should copy
1784 * the key into. Requiring the key-length + offset no greater
1785 * than the buffer length.
1786 * @return The length of the key.
1787 * @throws IOException
1788 */
1789 public int getKey(byte[] buf, int offset) throws IOException {
1790 if ((offset | (buf.length - offset - klen)) < 0) {
1791 throw new IndexOutOfBoundsException(
1792 "Bufer not enough to store the key");
1793 }
1794 System.arraycopy(keyBuffer, 0, buf, offset, klen);
1795 return klen;
1796 }
1797
1798 /**
1799 * Streaming access to the key. Useful for desrializing the key into
1800 * user objects.
1801 *
1802 * @return The input stream.
1803 */
1804 public DataInputStream getKeyStream() {
1805 keyDataInputStream.reset(keyBuffer, klen);
1806 return keyDataInputStream;
1807 }
1808
1809 /**
1810 * Get the length of the value. isValueLengthKnown() must be tested
1811 * true.
1812 *
1813 * @return the length of the value.
1814 */
1815 public int getValueLength() {
1816 if (vlen >= 0) {
1817 return vlen;
1818 }
1819
1820 throw new RuntimeException("Value length unknown.");
1821 }
1822
1823 /**
1824 * Copy value into user-supplied buffer. User supplied buffer must be
1825 * large enough to hold the whole value. The value part of the key-value
1826 * pair pointed by the current cursor is not cached and can only be
1827 * examined once. Calling any of the following functions more than once
1828 * without moving the cursor will result in exception:
1829 * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1830 * {@link #getValueStream}.
1831 *
1832 * @return the length of the value. Does not require
1833 * isValueLengthKnown() to be true.
1834 * @throws IOException
1835 *
1836 */
1837 public int getValue(byte[] buf) throws IOException {
1838 return getValue(buf, 0);
1839 }
1840
1841 /**
1842 * Copy value into user-supplied buffer. User supplied buffer must be
1843 * large enough to hold the whole value (starting from the offset). The
1844 * value part of the key-value pair pointed by the current cursor is not
1845 * cached and can only be examined once. Calling any of the following
1846 * functions more than once without moving the cursor will result in
1847 * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1848 * {@link #getValueStream}.
1849 *
1850 * @return the length of the value. Does not require
1851 * isValueLengthKnown() to be true.
1852 * @throws IOException
1853 */
1854 public int getValue(byte[] buf, int offset) throws IOException {
1855 DataInputStream dis = getValueStream();
1856 try {
1857 if (isValueLengthKnown()) {
1858 if ((offset | (buf.length - offset - vlen)) < 0) {
1859 throw new IndexOutOfBoundsException(
1860 "Buffer too small to hold value");
1861 }
1862 dis.readFully(buf, offset, vlen);
1863 return vlen;
1864 }
1865
1866 int nextOffset = offset;
1867 while (nextOffset < buf.length) {
1868 int n = dis.read(buf, nextOffset, buf.length - nextOffset);
1869 if (n < 0) {
1870 break;
1871 }
1872 nextOffset += n;
1873 }
1874 if (dis.read() >= 0) {
1875 // attempt to read one more byte to determine whether we reached
1876 // the
1877 // end or not.
1878 throw new IndexOutOfBoundsException(
1879 "Buffer too small to hold value");
1880 }
1881 return nextOffset - offset;
1882 } finally {
1883 dis.close();
1884 }
1885 }
1886
1887 /**
1888 * Stream access to value. The value part of the key-value pair pointed
1889 * by the current cursor is not cached and can only be examined once.
1890 * Calling any of the following functions more than once without moving
1891 * the cursor will result in exception: {@link #getValue(byte[])},
1892 * {@link #getValue(byte[], int)}, {@link #getValueStream}.
1893 *
1894 * @return The input stream for reading the value.
1895 * @throws IOException
1896 */
1897 public DataInputStream getValueStream() throws IOException {
1898 if (valueChecked == true) {
1899 throw new IllegalStateException(
1900 "Attempt to examine value multiple times.");
1901 }
1902 valueChecked = true;
1903 return valueDataInputStream;
1904 }
1905
1906 /**
1907 * Check whether it is safe to call getValueLength().
1908 *
1909 * @return true if value length is known before hand. Values less than
1910 * the chunk size will always have their lengths known before
1911 * hand. Values that are written out as a whole (with advertised
1912 * length up-front) will always have their lengths known in
1913 * read.
1914 */
1915 public boolean isValueLengthKnown() {
1916 return (vlen >= 0);
1917 }
1918
1919 /**
1920 * Compare the entry key to another key. Synonymous to compareTo(key, 0,
1921 * key.length).
1922 *
1923 * @param buf
1924 * The key buffer.
1925 * @return comparison result between the entry key with the input key.
1926 */
1927 public int compareTo(byte[] buf) {
1928 return compareTo(buf, 0, buf.length);
1929 }
1930
1931 /**
1932 * Compare the entry key to another key. Synonymous to compareTo(new
1933 * ByteArray(buf, offset, length)
1934 *
1935 * @param buf
1936 * The key buffer
1937 * @param offset
1938 * offset into the key buffer.
1939 * @param length
1940 * the length of the key.
1941 * @return comparison result between the entry key with the input key.
1942 */
1943 public int compareTo(byte[] buf, int offset, int length) {
1944 return compareTo(new ByteArray(buf, offset, length));
1945 }
1946
1947 /**
1948 * Compare an entry with a RawComparable object. This is useful when
1949 * Entries are stored in a collection, and we want to compare a user
1950 * supplied key.
1951 */
1952 @Override
1953 public int compareTo(RawComparable key) {
1954 return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
1955 key.offset(), key.size());
1956 }
1957
1958 /**
1959 * Compare whether this and other points to the same key value.
1960 */
1961 @Override
1962 public boolean equals(Object other) {
1963 if (this == other) return true;
1964 if (!(other instanceof Entry)) return false;
1965 return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
1966 }
1967
1968 @Override
1969 public int hashCode() {
1970 return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
1971 }
1972 }
1973
1974 /**
1975 * Advance cursor by n positions within the block.
1976 *
1977 * @param n
1978 * Number of key-value pairs to skip in block.
1979 * @throws IOException
1980 */
1981 private void inBlockAdvance(long n) throws IOException {
1982 for (long i = 0; i < n; ++i) {
1983 checkKey();
1984 if (!valueBufferInputStream.isClosed()) {
1985 valueBufferInputStream.close();
1986 }
1987 klen = -1;
1988 currentLocation.incRecordIndex();
1989 }
1990 }
1991
1992 /**
1993 * Advance cursor in block until we find a key that is greater than or
1994 * equal to the input key.
1995 *
1996 * @param key
1997 * Key to compare.
1998 * @param greater
1999 * advance until we find a key greater than the input key.
2000 * @return true if we find a equal key.
2001 * @throws IOException
2002 */
2003 private boolean inBlockAdvance(RawComparable key, boolean greater)
2004 throws IOException {
2005 int curBid = currentLocation.getBlockIndex();
2006 long entryInBlock = reader.getBlockEntryCount(curBid);
2007 if (curBid == endLocation.getBlockIndex()) {
2008 entryInBlock = endLocation.getRecordIndex();
2009 }
2010
2011 while (currentLocation.getRecordIndex() < entryInBlock) {
2012 int cmp = compareCursorKeyTo(key);
2013 if (cmp > 0) return false;
2014 if (cmp == 0 && !greater) return true;
2015 if (!valueBufferInputStream.isClosed()) {
2016 valueBufferInputStream.close();
2017 }
2018 klen = -1;
2019 currentLocation.incRecordIndex();
2020 }
2021
2022 throw new RuntimeException("Cannot find matching key in block.");
2023 }
2024 }
2025
2026 long getBlockEntryCount(int curBid) {
2027 return tfileIndex.getEntry(curBid).entries();
2028 }
2029
2030 BlockReader getBlockReader(int blockIndex) throws IOException {
2031 return readerBCF.getDataBlock(blockIndex);
2032 }
2033 }
2034
2035 /**
2036 * Data structure representing "TFile.meta" meta block.
2037 */
2038 static final class TFileMeta {
2039 final static String BLOCK_NAME = "TFile.meta";
2040 final Version version;
2041 private long recordCount;
2042 private final String strComparator;
2043 private final BytesComparator comparator;
2044
2045 // ctor for writes
2046 public TFileMeta(String comparator) {
2047 // set fileVersion to API version when we create it.
2048 version = TFile.API_VERSION;
2049 recordCount = 0;
2050 strComparator = (comparator == null) ? "" : comparator;
2051 this.comparator = makeComparator(strComparator);
2052 }
2053
2054 // ctor for reads
2055 public TFileMeta(DataInput in) throws IOException {
2056 version = new Version(in);
2057 if (!version.compatibleWith(TFile.API_VERSION)) {
2058 throw new RuntimeException("Incompatible TFile fileVersion.");
2059 }
2060 recordCount = Utils.readVLong(in);
2061 strComparator = Utils.readString(in);
2062 comparator = makeComparator(strComparator);
2063 }
2064
2065 @SuppressWarnings("unchecked")
2066 static BytesComparator makeComparator(String comparator) {
2067 if (comparator.length() == 0) {
2068 // unsorted keys
2069 return null;
2070 }
2071 if (comparator.equals(COMPARATOR_MEMCMP)) {
2072 // default comparator
2073 return new BytesComparator(new MemcmpRawComparator());
2074 } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
2075 String compClassName =
2076 comparator.substring(COMPARATOR_JCLASS.length()).trim();
2077 try {
2078 Class compClass = Class.forName(compClassName);
2079 // use its default ctor to create an instance
2080 return new BytesComparator((RawComparator<Object>) compClass
2081 .newInstance());
2082 } catch (Exception e) {
2083 throw new IllegalArgumentException(
2084 "Failed to instantiate comparator: " + comparator + "("
2085 + e.toString() + ")");
2086 }
2087 } else {
2088 throw new IllegalArgumentException("Unsupported comparator: "
2089 + comparator);
2090 }
2091 }
2092
2093 public void write(DataOutput out) throws IOException {
2094 TFile.API_VERSION.write(out);
2095 Utils.writeVLong(out, recordCount);
2096 Utils.writeString(out, strComparator);
2097 }
2098
2099 public long getRecordCount() {
2100 return recordCount;
2101 }
2102
2103 public void incRecordCount() {
2104 ++recordCount;
2105 }
2106
2107 public boolean isSorted() {
2108 return !strComparator.isEmpty();
2109 }
2110
2111 public String getComparatorString() {
2112 return strComparator;
2113 }
2114
2115 public BytesComparator getComparator() {
2116 return comparator;
2117 }
2118
2119 public Version getVersion() {
2120 return version;
2121 }
2122 } // END: class MetaTFileMeta
2123
2124 /**
2125 * Data structure representing "TFile.index" meta block.
2126 */
2127 static class TFileIndex {
2128 final static String BLOCK_NAME = "TFile.index";
2129 private ByteArray firstKey;
2130 private final ArrayList<TFileIndexEntry> index;
2131 private final ArrayList<Long> recordNumIndex;
2132 private final BytesComparator comparator;
2133 private long sum = 0;
2134
2135 /**
2136 * For reading from file.
2137 *
2138 * @throws IOException
2139 */
2140 public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
2141 throws IOException {
2142 index = new ArrayList<TFileIndexEntry>(entryCount);
2143 recordNumIndex = new ArrayList<Long>(entryCount);
2144 int size = Utils.readVInt(in); // size for the first key entry.
2145 if (size > 0) {
2146 byte[] buffer = new byte[size];
2147 in.readFully(buffer);
2148 DataInputStream firstKeyInputStream =
2149 new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
2150
2151 int firstKeyLength = Utils.readVInt(firstKeyInputStream);
2152 firstKey = new ByteArray(new byte[firstKeyLength]);
2153 firstKeyInputStream.readFully(firstKey.buffer());
2154
2155 for (int i = 0; i < entryCount; i++) {
2156 size = Utils.readVInt(in);
2157 if (buffer.length < size) {
2158 buffer = new byte[size];
2159 }
2160 in.readFully(buffer, 0, size);
2161 TFileIndexEntry idx =
2162 new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
2163 buffer, 0, size)));
2164 index.add(idx);
2165 sum += idx.entries();
2166 recordNumIndex.add(sum);
2167 }
2168 } else {
2169 if (entryCount != 0) {
2170 throw new RuntimeException("Internal error");
2171 }
2172 }
2173 this.comparator = comparator;
2174 }
2175
2176 /**
2177 * @param key
2178 * input key.
2179 * @return the ID of the first block that contains key >= input key. Or -1
2180 * if no such block exists.
2181 */
2182 public int lowerBound(RawComparable key) {
2183 if (comparator == null) {
2184 throw new RuntimeException("Cannot search in unsorted TFile");
2185 }
2186
2187 if (firstKey == null) {
2188 return -1; // not found
2189 }
2190
2191 int ret = Utils.lowerBound(index, key, comparator);
2192 if (ret == index.size()) {
2193 return -1;
2194 }
2195 return ret;
2196 }
2197
2198 /**
2199 * @param key
2200 * input key.
2201 * @return the ID of the first block that contains key > input key. Or -1
2202 * if no such block exists.
2203 */
2204 public int upperBound(RawComparable key) {
2205 if (comparator == null) {
2206 throw new RuntimeException("Cannot search in unsorted TFile");
2207 }
2208
2209 if (firstKey == null) {
2210 return -1; // not found
2211 }
2212
2213 int ret = Utils.upperBound(index, key, comparator);
2214 if (ret == index.size()) {
2215 return -1;
2216 }
2217 return ret;
2218 }
2219
2220 /**
2221 * For writing to file.
2222 */
2223 public TFileIndex(BytesComparator comparator) {
2224 index = new ArrayList<TFileIndexEntry>();
2225 recordNumIndex = new ArrayList<Long>();
2226 this.comparator = comparator;
2227 }
2228
2229 public RawComparable getFirstKey() {
2230 return firstKey;
2231 }
2232
2233 public Reader.Location getLocationByRecordNum(long recNum) {
2234 int idx = Utils.upperBound(recordNumIndex, recNum);
2235 long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
2236 return new Reader.Location(idx, recNum-lastRecNum);
2237 }
2238
2239 public long getRecordNumByLocation(Reader.Location location) {
2240 int blkIndex = location.getBlockIndex();
2241 long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
2242 return lastRecNum + location.getRecordIndex();
2243 }
2244
2245 public void setFirstKey(byte[] key, int offset, int length) {
2246 firstKey = new ByteArray(new byte[length]);
2247 System.arraycopy(key, offset, firstKey.buffer(), 0, length);
2248 }
2249
2250 public RawComparable getLastKey() {
2251 if (index.size() == 0) {
2252 return null;
2253 }
2254 return new ByteArray(index.get(index.size() - 1).buffer());
2255 }
2256
2257 public void addEntry(TFileIndexEntry keyEntry) {
2258 index.add(keyEntry);
2259 sum += keyEntry.entries();
2260 recordNumIndex.add(sum);
2261 }
2262
2263 public TFileIndexEntry getEntry(int bid) {
2264 return index.get(bid);
2265 }
2266
2267 public void write(DataOutput out) throws IOException {
2268 if (firstKey == null) {
2269 Utils.writeVInt(out, 0);
2270 return;
2271 }
2272
2273 DataOutputBuffer dob = new DataOutputBuffer();
2274 Utils.writeVInt(dob, firstKey.size());
2275 dob.write(firstKey.buffer());
2276 Utils.writeVInt(out, dob.size());
2277 out.write(dob.getData(), 0, dob.getLength());
2278
2279 for (TFileIndexEntry entry : index) {
2280 dob.reset();
2281 entry.write(dob);
2282 Utils.writeVInt(out, dob.getLength());
2283 out.write(dob.getData(), 0, dob.getLength());
2284 }
2285 }
2286 }
2287
2288 /**
2289 * TFile Data Index entry. We should try to make the memory footprint of each
2290 * index entry as small as possible.
2291 */
2292 static final class TFileIndexEntry implements RawComparable {
2293 final byte[] key;
2294 // count of <key, value> entries in the block.
2295 final long kvEntries;
2296
2297 public TFileIndexEntry(DataInput in) throws IOException {
2298 int len = Utils.readVInt(in);
2299 key = new byte[len];
2300 in.readFully(key, 0, len);
2301 kvEntries = Utils.readVLong(in);
2302 }
2303
2304 // default entry, without any padding
2305 public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
2306 key = new byte[len];
2307 System.arraycopy(newkey, offset, key, 0, len);
2308 this.kvEntries = entries;
2309 }
2310
2311 @Override
2312 public byte[] buffer() {
2313 return key;
2314 }
2315
2316 @Override
2317 public int offset() {
2318 return 0;
2319 }
2320
2321 @Override
2322 public int size() {
2323 return key.length;
2324 }
2325
2326 long entries() {
2327 return kvEntries;
2328 }
2329
2330 public void write(DataOutput out) throws IOException {
2331 Utils.writeVInt(out, key.length);
2332 out.write(key, 0, key.length);
2333 Utils.writeVLong(out, kvEntries);
2334 }
2335 }
2336
2337 /**
2338 * Dumping the TFile information.
2339 *
2340 * @param args
2341 * A list of TFile paths.
2342 */
2343 public static void main(String[] args) {
2344 System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION
2345 .toString(), BCFile.API_VERSION.toString());
2346 if (args.length == 0) {
2347 System.out
2348 .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
2349 System.exit(0);
2350 }
2351 Configuration conf = new Configuration();
2352
2353 for (String file : args) {
2354 System.out.println("===" + file + "===");
2355 try {
2356 TFileDumper.dumpInfo(file, System.out, conf);
2357 } catch (IOException e) {
2358 e.printStackTrace(System.err);
2359 }
2360 }
2361 }
2362 }