001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements. See the NOTICE file distributed with this
004     * work for additional information regarding copyright ownership. The ASF
005     * licenses this file to you under the Apache License, Version 2.0 (the
006     * "License"); you may not use this file except in compliance with the License.
007     * You may obtain a copy of the License at
008     * 
009     * http://www.apache.org/licenses/LICENSE-2.0
010     * 
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014     * License for the specific language governing permissions and limitations under
015     * the License.
016     */
017    
018    package org.apache.hadoop.io.file.tfile;
019    
020    import java.io.ByteArrayInputStream;
021    import java.io.Closeable;
022    import java.io.DataInput;
023    import java.io.DataInputStream;
024    import java.io.DataOutput;
025    import java.io.DataOutputStream;
026    import java.io.EOFException;
027    import java.io.IOException;
028    import java.io.OutputStream;
029    import java.util.ArrayList;
030    import java.util.Comparator;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.conf.Configuration;
037    import org.apache.hadoop.fs.FSDataInputStream;
038    import org.apache.hadoop.fs.FSDataOutputStream;
039    import org.apache.hadoop.io.BoundedByteArrayOutputStream;
040    import org.apache.hadoop.io.BytesWritable;
041    import org.apache.hadoop.io.DataInputBuffer;
042    import org.apache.hadoop.io.DataOutputBuffer;
043    import org.apache.hadoop.io.IOUtils;
044    import org.apache.hadoop.io.RawComparator;
045    import org.apache.hadoop.io.WritableComparator;
046    import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
047    import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
048    import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
049    import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
050    import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
051    import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
052    import org.apache.hadoop.io.file.tfile.Utils.Version;
053    import org.apache.hadoop.io.serializer.JavaSerializationComparator;
054    
055    /**
056     * A TFile is a container of key-value pairs. Both keys and values are type-less
057     * bytes. Keys are restricted to 64KB, value length is not restricted
058     * (practically limited to the available disk storage). TFile further provides
059     * the following features:
060     * <ul>
061     * <li>Block Compression.
062     * <li>Named meta data blocks.
063     * <li>Sorted or unsorted keys.
064     * <li>Seek by key or by file offset.
065     * </ul>
066     * The memory footprint of a TFile includes the following:
067     * <ul>
068     * <li>Some constant overhead of reading or writing a compressed block.
069     * <ul>
070     * <li>Each compressed block requires one compression/decompression codec for
071     * I/O.
072     * <li>Temporary space to buffer the key.
073     * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
074     * chunk encoded, so that we buffer at most one chunk of user data. By default,
075     * the chunk buffer is 1MB. Reading chunked value does not require additional
076     * memory.
077     * </ul>
078     * <li>TFile index, which is proportional to the total number of Data Blocks.
079     * The total amount of memory needed to hold the index can be estimated as
080     * (56+AvgKeySize)*NumBlocks.
081     * <li>MetaBlock index, which is proportional to the total number of Meta
082     * Blocks.The total amount of memory needed to hold the index for Meta Blocks
083     * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
084     * </ul>
085     * <p>
086     * The behavior of TFile can be customized by the following variables through
087     * Configuration:
088     * <ul>
089     * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
090     * to 1MB. Values of the length less than the chunk size is guaranteed to have
091     * known value length in read time (See
092     * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
093     * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
094     * FSDataOutputStream. Integer (in bytes). Default to 256KB.
095     * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
096     * FSDataInputStream. Integer (in bytes). Default to 256KB.
097     * </ul>
098     * <p>
099     * Suggestions on performance optimization.
100     * <ul>
101     * <li>Minimum block size. We recommend a setting of minimum block size between
102     * 256KB to 1MB for general usage. Larger block size is preferred if files are
103     * primarily for sequential access. However, it would lead to inefficient random
104     * access (because there are more data to decompress). Smaller blocks are good
105     * for random access, but require more memory to hold the block index, and may
106     * be slower to create (because we must flush the compressor stream at the
107     * conclusion of each data block, which leads to an FS I/O flush). Further, due
108     * to the internal caching in Compression codec, the smallest possible block
109     * size would be around 20KB-30KB.
110     * <li>The current implementation does not offer true multi-threading for
111     * reading. The implementation uses FSDataInputStream seek()+read(), which is
112     * shown to be much faster than positioned-read call in single thread mode.
113     * However, it also means that if multiple threads attempt to access the same
114     * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
115     * sequentially even if they access different DFS blocks.
116     * <li>Compression codec. Use "none" if the data is not very compressable (by
117     * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
118     * as the starting point for experimenting. "gz" overs slightly better
119     * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
120     * decompress, comparing to "lzo".
121     * <li>File system buffering, if the underlying FSDataInputStream and
122     * FSDataOutputStream is already adequately buffered; or if applications
123     * reads/writes keys and values in large buffers, we can reduce the sizes of
124     * input/output buffering in TFile layer by setting the configuration parameters
125     * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
126     * </ul>
127     * 
128     * Some design rationale behind TFile can be found at <a
129     * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
130     */
131    @InterfaceAudience.Public
132    @InterfaceStability.Evolving
133    public class TFile {
134      static final Log LOG = LogFactory.getLog(TFile.class);
135    
136      private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
137      private static final String FS_INPUT_BUF_SIZE_ATTR =
138          "tfile.fs.input.buffer.size";
139      private static final String FS_OUTPUT_BUF_SIZE_ATTR =
140          "tfile.fs.output.buffer.size";
141    
142      static int getChunkBufferSize(Configuration conf) {
143        int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
144        return (ret > 0) ? ret : 1024 * 1024;
145      }
146    
147      static int getFSInputBufferSize(Configuration conf) {
148        return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
149      }
150    
151      static int getFSOutputBufferSize(Configuration conf) {
152        return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
153      }
154    
155      private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
156      static final Version API_VERSION = new Version((short) 1, (short) 0);
157    
158      /** compression: gzip */
159      public static final String COMPRESSION_GZ = "gz";
160      /** compression: lzo */
161      public static final String COMPRESSION_LZO = "lzo";
162      /** compression: none */
163      public static final String COMPRESSION_NONE = "none";
164      /** comparator: memcmp */
165      public static final String COMPARATOR_MEMCMP = "memcmp";
166      /** comparator prefix: java class */
167      public static final String COMPARATOR_JCLASS = "jclass:";
168    
169      /**
170       * Make a raw comparator from a string name.
171       * 
172       * @param name
173       *          Comparator name
174       * @return A RawComparable comparator.
175       */
176      static public Comparator<RawComparable> makeComparator(String name) {
177        return TFileMeta.makeComparator(name);
178      }
179    
180      // Prevent the instantiation of TFiles
181      private TFile() {
182        // nothing
183      }
184    
185      /**
186       * Get names of supported compression algorithms. The names are acceptable by
187       * TFile.Writer.
188       * 
189       * @return Array of strings, each represents a supported compression
190       *         algorithm. Currently, the following compression algorithms are
191       *         supported.
192       *         <ul>
193       *         <li>"none" - No compression.
194       *         <li>"lzo" - LZO compression.
195       *         <li>"gz" - GZIP compression.
196       *         </ul>
197       */
198      public static String[] getSupportedCompressionAlgorithms() {
199        return Compression.getSupportedAlgorithms();
200      }
201    
202      /**
203       * TFile Writer.
204       */
205      @InterfaceStability.Evolving
206      public static class Writer implements Closeable {
207        // minimum compressed size for a block.
208        private final int sizeMinBlock;
209    
210        // Meta blocks.
211        final TFileIndex tfileIndex;
212        final TFileMeta tfileMeta;
213    
214        // reference to the underlying BCFile.
215        private BCFile.Writer writerBCF;
216    
217        // current data block appender.
218        BlockAppender blkAppender;
219        long blkRecordCount;
220    
221        // buffers for caching the key.
222        BoundedByteArrayOutputStream currentKeyBufferOS;
223        BoundedByteArrayOutputStream lastKeyBufferOS;
224    
225        // buffer used by chunk codec
226        private byte[] valueBuffer;
227    
228        /**
229         * Writer states. The state always transits in circles: READY -> IN_KEY ->
230         * END_KEY -> IN_VALUE -> READY.
231         */
232        private enum State {
233          READY, // Ready to start a new key-value pair insertion.
234          IN_KEY, // In the middle of key insertion.
235          END_KEY, // Key insertion complete, ready to insert value.
236          IN_VALUE, // In value insertion.
237          // ERROR, // Error encountered, cannot continue.
238          CLOSED, // TFile already closed.
239        };
240    
241        // current state of Writer.
242        State state = State.READY;
243        Configuration conf;
244        long errorCount = 0;
245    
246        /**
247         * Constructor
248         * 
249         * @param fsdos
250         *          output stream for writing. Must be at position 0.
251         * @param minBlockSize
252         *          Minimum compressed block size in bytes. A compression block will
253         *          not be closed until it reaches this size except for the last
254         *          block.
255         * @param compressName
256         *          Name of the compression algorithm. Must be one of the strings
257         *          returned by {@link TFile#getSupportedCompressionAlgorithms()}.
258         * @param comparator
259         *          Leave comparator as null or empty string if TFile is not sorted.
260         *          Otherwise, provide the string name for the comparison algorithm
261         *          for keys. Two kinds of comparators are supported.
262         *          <ul>
263         *          <li>Algorithmic comparator: binary comparators that is language
264         *          independent. Currently, only "memcmp" is supported.
265         *          <li>Language-specific comparator: binary comparators that can
266         *          only be constructed in specific language. For Java, the syntax
267         *          is "jclass:", followed by the class name of the RawComparator.
268         *          Currently, we only support RawComparators that can be
269         *          constructed through the default constructor (with no
270         *          parameters). Parameterized RawComparators such as
271         *          {@link WritableComparator} or
272         *          {@link JavaSerializationComparator} may not be directly used.
273         *          One should write a wrapper class that inherits from such classes
274         *          and use its default constructor to perform proper
275         *          initialization.
276         *          </ul>
277         * @param conf
278         *          The configuration object.
279         * @throws IOException
280         */
281        public Writer(FSDataOutputStream fsdos, int minBlockSize,
282            String compressName, String comparator, Configuration conf)
283            throws IOException {
284          sizeMinBlock = minBlockSize;
285          tfileMeta = new TFileMeta(comparator);
286          tfileIndex = new TFileIndex(tfileMeta.getComparator());
287    
288          writerBCF = new BCFile.Writer(fsdos, compressName, conf);
289          currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
290          lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
291          this.conf = conf;
292        }
293    
294        /**
295         * Close the Writer. Resources will be released regardless of the exceptions
296         * being thrown. Future close calls will have no effect.
297         * 
298         * The underlying FSDataOutputStream is not closed.
299         */
300        @Override
301        public void close() throws IOException {
302          if ((state == State.CLOSED)) {
303            return;
304          }
305          try {
306            // First try the normal finish.
307            // Terminate upon the first Exception.
308            if (errorCount == 0) {
309              if (state != State.READY) {
310                throw new IllegalStateException(
311                    "Cannot close TFile in the middle of key-value insertion.");
312              }
313    
314              finishDataBlock(true);
315    
316              // first, write out data:TFile.meta
317              BlockAppender outMeta =
318                  writerBCF
319                      .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
320              try {
321                tfileMeta.write(outMeta);
322              } finally {
323                outMeta.close();
324              }
325    
326              // second, write out data:TFile.index
327              BlockAppender outIndex =
328                  writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
329              try {
330                tfileIndex.write(outIndex);
331              } finally {
332                outIndex.close();
333              }
334    
335              writerBCF.close();
336            }
337          } finally {
338            IOUtils.cleanup(LOG, blkAppender, writerBCF);
339            blkAppender = null;
340            writerBCF = null;
341            state = State.CLOSED;
342          }
343        }
344    
345        /**
346         * Adding a new key-value pair to the TFile. This is synonymous to
347         * append(key, 0, key.length, value, 0, value.length)
348         * 
349         * @param key
350         *          Buffer for key.
351         * @param value
352         *          Buffer for value.
353         * @throws IOException
354         */
355        public void append(byte[] key, byte[] value) throws IOException {
356          append(key, 0, key.length, value, 0, value.length);
357        }
358    
359        /**
360         * Adding a new key-value pair to TFile.
361         * 
362         * @param key
363         *          buffer for key.
364         * @param koff
365         *          offset in key buffer.
366         * @param klen
367         *          length of key.
368         * @param value
369         *          buffer for value.
370         * @param voff
371         *          offset in value buffer.
372         * @param vlen
373         *          length of value.
374         * @throws IOException
375         *           Upon IO errors.
376         *           <p>
377         *           If an exception is thrown, the TFile will be in an inconsistent
378         *           state. The only legitimate call after that would be close
379         */
380        public void append(byte[] key, int koff, int klen, byte[] value, int voff,
381            int vlen) throws IOException {
382          if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
383            throw new IndexOutOfBoundsException(
384                "Bad key buffer offset-length combination.");
385          }
386    
387          if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
388            throw new IndexOutOfBoundsException(
389                "Bad value buffer offset-length combination.");
390          }
391    
392          try {
393            DataOutputStream dosKey = prepareAppendKey(klen);
394            try {
395              ++errorCount;
396              dosKey.write(key, koff, klen);
397              --errorCount;
398            } finally {
399              dosKey.close();
400            }
401    
402            DataOutputStream dosValue = prepareAppendValue(vlen);
403            try {
404              ++errorCount;
405              dosValue.write(value, voff, vlen);
406              --errorCount;
407            } finally {
408              dosValue.close();
409            }
410          } finally {
411            state = State.READY;
412          }
413        }
414    
415        /**
416         * Helper class to register key after close call on key append stream.
417         */
418        private class KeyRegister extends DataOutputStream {
419          private final int expectedLength;
420          private boolean closed = false;
421    
422          public KeyRegister(int len) {
423            super(currentKeyBufferOS);
424            if (len >= 0) {
425              currentKeyBufferOS.reset(len);
426            } else {
427              currentKeyBufferOS.reset();
428            }
429            expectedLength = len;
430          }
431    
432          @Override
433          public void close() throws IOException {
434            if (closed == true) {
435              return;
436            }
437    
438            try {
439              ++errorCount;
440              byte[] key = currentKeyBufferOS.getBuffer();
441              int len = currentKeyBufferOS.size();
442              /**
443               * verify length.
444               */
445              if (expectedLength >= 0 && expectedLength != len) {
446                throw new IOException("Incorrect key length: expected="
447                    + expectedLength + " actual=" + len);
448              }
449    
450              Utils.writeVInt(blkAppender, len);
451              blkAppender.write(key, 0, len);
452              if (tfileIndex.getFirstKey() == null) {
453                tfileIndex.setFirstKey(key, 0, len);
454              }
455    
456              if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
457                byte[] lastKey = lastKeyBufferOS.getBuffer();
458                int lastLen = lastKeyBufferOS.size();
459                if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
460                    lastLen) < 0) {
461                  throw new IOException("Keys are not added in sorted order");
462                }
463              }
464    
465              BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
466              currentKeyBufferOS = lastKeyBufferOS;
467              lastKeyBufferOS = tmp;
468              --errorCount;
469            } finally {
470              closed = true;
471              state = State.END_KEY;
472            }
473          }
474        }
475    
476        /**
477         * Helper class to register value after close call on value append stream.
478         */
479        private class ValueRegister extends DataOutputStream {
480          private boolean closed = false;
481    
482          public ValueRegister(OutputStream os) {
483            super(os);
484          }
485    
486          // Avoiding flushing call to down stream.
487          @Override
488          public void flush() {
489            // do nothing
490          }
491    
492          @Override
493          public void close() throws IOException {
494            if (closed == true) {
495              return;
496            }
497    
498            try {
499              ++errorCount;
500              super.close();
501              blkRecordCount++;
502              // bump up the total record count in the whole file
503              tfileMeta.incRecordCount();
504              finishDataBlock(false);
505              --errorCount;
506            } finally {
507              closed = true;
508              state = State.READY;
509            }
510          }
511        }
512    
513        /**
514         * Obtain an output stream for writing a key into TFile. This may only be
515         * called when there is no active Key appending stream or value appending
516         * stream.
517         * 
518         * @param length
519         *          The expected length of the key. If length of the key is not
520         *          known, set length = -1. Otherwise, the application must write
521         *          exactly as many bytes as specified here before calling close on
522         *          the returned output stream.
523         * @return The key appending output stream.
524         * @throws IOException
525         * 
526         */
527        public DataOutputStream prepareAppendKey(int length) throws IOException {
528          if (state != State.READY) {
529            throw new IllegalStateException("Incorrect state to start a new key: "
530                + state.name());
531          }
532    
533          initDataBlock();
534          DataOutputStream ret = new KeyRegister(length);
535          state = State.IN_KEY;
536          return ret;
537        }
538    
539        /**
540         * Obtain an output stream for writing a value into TFile. This may only be
541         * called right after a key appending operation (the key append stream must
542         * be closed).
543         * 
544         * @param length
545         *          The expected length of the value. If length of the value is not
546         *          known, set length = -1. Otherwise, the application must write
547         *          exactly as many bytes as specified here before calling close on
548         *          the returned output stream. Advertising the value size up-front
549         *          guarantees that the value is encoded in one chunk, and avoids
550         *          intermediate chunk buffering.
551         * @throws IOException
552         * 
553         */
554        public DataOutputStream prepareAppendValue(int length) throws IOException {
555          if (state != State.END_KEY) {
556            throw new IllegalStateException(
557                "Incorrect state to start a new value: " + state.name());
558          }
559    
560          DataOutputStream ret;
561    
562          // unknown length
563          if (length < 0) {
564            if (valueBuffer == null) {
565              valueBuffer = new byte[getChunkBufferSize(conf)];
566            }
567            ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
568          } else {
569            ret =
570                new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
571          }
572    
573          state = State.IN_VALUE;
574          return ret;
575        }
576    
577        /**
578         * Obtain an output stream for creating a meta block. This function may not
579         * be called when there is a key append stream or value append stream
580         * active. No more key-value insertion is allowed after a meta data block
581         * has been added to TFile.
582         * 
583         * @param name
584         *          Name of the meta block.
585         * @param compressName
586         *          Name of the compression algorithm to be used. Must be one of the
587         *          strings returned by
588         *          {@link TFile#getSupportedCompressionAlgorithms()}.
589         * @return A DataOutputStream that can be used to write Meta Block data.
590         *         Closing the stream would signal the ending of the block.
591         * @throws IOException
592         * @throws MetaBlockAlreadyExists
593         *           the Meta Block with the same name already exists.
594         */
595        public DataOutputStream prepareMetaBlock(String name, String compressName)
596            throws IOException, MetaBlockAlreadyExists {
597          if (state != State.READY) {
598            throw new IllegalStateException(
599                "Incorrect state to start a Meta Block: " + state.name());
600          }
601    
602          finishDataBlock(true);
603          DataOutputStream outputStream =
604              writerBCF.prepareMetaBlock(name, compressName);
605          return outputStream;
606        }
607    
608        /**
609         * Obtain an output stream for creating a meta block. This function may not
610         * be called when there is a key append stream or value append stream
611         * active. No more key-value insertion is allowed after a meta data block
612         * has been added to TFile. Data will be compressed using the default
613         * compressor as defined in Writer's constructor.
614         * 
615         * @param name
616         *          Name of the meta block.
617         * @return A DataOutputStream that can be used to write Meta Block data.
618         *         Closing the stream would signal the ending of the block.
619         * @throws IOException
620         * @throws MetaBlockAlreadyExists
621         *           the Meta Block with the same name already exists.
622         */
623        public DataOutputStream prepareMetaBlock(String name) throws IOException,
624            MetaBlockAlreadyExists {
625          if (state != State.READY) {
626            throw new IllegalStateException(
627                "Incorrect state to start a Meta Block: " + state.name());
628          }
629    
630          finishDataBlock(true);
631          return writerBCF.prepareMetaBlock(name);
632        }
633    
634        /**
635         * Check if we need to start a new data block.
636         * 
637         * @throws IOException
638         */
639        private void initDataBlock() throws IOException {
640          // for each new block, get a new appender
641          if (blkAppender == null) {
642            blkAppender = writerBCF.prepareDataBlock();
643          }
644        }
645    
646        /**
647         * Close the current data block if necessary.
648         * 
649         * @param bForceFinish
650         *          Force the closure regardless of the block size.
651         * @throws IOException
652         */
653        void finishDataBlock(boolean bForceFinish) throws IOException {
654          if (blkAppender == null) {
655            return;
656          }
657    
658          // exceeded the size limit, do the compression and finish the block
659          if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
660            // keep tracks of the last key of each data block, no padding
661            // for now
662            TFileIndexEntry keyLast =
663                new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
664                    .size(), blkRecordCount);
665            tfileIndex.addEntry(keyLast);
666            // close the appender
667            blkAppender.close();
668            blkAppender = null;
669            blkRecordCount = 0;
670          }
671        }
672      }
673    
674      /**
675       * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
676       * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
677       * ) , a portion of TFile based on byte offsets (
678       * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
679       * fall in a certain key range (for sorted TFile only,
680       * {@link Reader#createScannerByKey(byte[], byte[])} or
681       * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
682       */
683      @InterfaceStability.Evolving
684      public static class Reader implements Closeable {
685        // The underlying BCFile reader.
686        final BCFile.Reader readerBCF;
687    
688        // TFile index, it is loaded lazily.
689        TFileIndex tfileIndex = null;
690        final TFileMeta tfileMeta;
691        final BytesComparator comparator;
692    
693        // global begin and end locations.
694        private final Location begin;
695        private final Location end;
696    
697        /**
698         * Location representing a virtual position in the TFile.
699         */
700        static final class Location implements Comparable<Location>, Cloneable {
701          private int blockIndex;
702          // distance/offset from the beginning of the block
703          private long recordIndex;
704    
705          Location(int blockIndex, long recordIndex) {
706            set(blockIndex, recordIndex);
707          }
708    
709          void incRecordIndex() {
710            ++recordIndex;
711          }
712    
713          Location(Location other) {
714            set(other);
715          }
716    
717          int getBlockIndex() {
718            return blockIndex;
719          }
720    
721          long getRecordIndex() {
722            return recordIndex;
723          }
724    
725          void set(int blockIndex, long recordIndex) {
726            if ((blockIndex | recordIndex) < 0) {
727              throw new IllegalArgumentException(
728                  "Illegal parameter for BlockLocation.");
729            }
730            this.blockIndex = blockIndex;
731            this.recordIndex = recordIndex;
732          }
733    
734          void set(Location other) {
735            set(other.blockIndex, other.recordIndex);
736          }
737    
738          /**
739           * @see java.lang.Comparable#compareTo(java.lang.Object)
740           */
741          @Override
742          public int compareTo(Location other) {
743            return compareTo(other.blockIndex, other.recordIndex);
744          }
745    
746          int compareTo(int bid, long rid) {
747            if (this.blockIndex == bid) {
748              long ret = this.recordIndex - rid;
749              if (ret > 0) return 1;
750              if (ret < 0) return -1;
751              return 0;
752            }
753            return this.blockIndex - bid;
754          }
755    
756          /**
757           * @see java.lang.Object#clone()
758           */
759          @Override
760          protected Location clone() {
761            return new Location(blockIndex, recordIndex);
762          }
763    
764          /**
765           * @see java.lang.Object#hashCode()
766           */
767          @Override
768          public int hashCode() {
769            final int prime = 31;
770            int result = prime + blockIndex;
771            result = (int) (prime * result + recordIndex);
772            return result;
773          }
774    
775          /**
776           * @see java.lang.Object#equals(java.lang.Object)
777           */
778          @Override
779          public boolean equals(Object obj) {
780            if (this == obj) return true;
781            if (obj == null) return false;
782            if (getClass() != obj.getClass()) return false;
783            Location other = (Location) obj;
784            if (blockIndex != other.blockIndex) return false;
785            if (recordIndex != other.recordIndex) return false;
786            return true;
787          }
788        }
789    
790        /**
791         * Constructor
792         * 
793         * @param fsdis
794         *          FS input stream of the TFile.
795         * @param fileLength
796         *          The length of TFile. This is required because we have no easy
797         *          way of knowing the actual size of the input file through the
798         *          File input stream.
799         * @param conf
800         * @throws IOException
801         */
802        public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
803            throws IOException {
804          readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
805    
806          // first, read TFile meta
807          BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
808          try {
809            tfileMeta = new TFileMeta(brMeta);
810          } finally {
811            brMeta.close();
812          }
813    
814          comparator = tfileMeta.getComparator();
815          // Set begin and end locations.
816          begin = new Location(0, 0);
817          end = new Location(readerBCF.getBlockCount(), 0);
818        }
819    
820        /**
821         * Close the reader. The state of the Reader object is undefined after
822         * close. Calling close() for multiple times has no effect.
823         */
824        @Override
825        public void close() throws IOException {
826          readerBCF.close();
827        }
828    
829        /**
830         * Get the begin location of the TFile.
831         * 
832         * @return If TFile is not empty, the location of the first key-value pair.
833         *         Otherwise, it returns end().
834         */
835        Location begin() {
836          return begin;
837        }
838    
839        /**
840         * Get the end location of the TFile.
841         * 
842         * @return The location right after the last key-value pair in TFile.
843         */
844        Location end() {
845          return end;
846        }
847    
848        /**
849         * Get the string representation of the comparator.
850         * 
851         * @return If the TFile is not sorted by keys, an empty string will be
852         *         returned. Otherwise, the actual comparator string that is
853         *         provided during the TFile creation time will be returned.
854         */
855        public String getComparatorName() {
856          return tfileMeta.getComparatorString();
857        }
858    
859        /**
860         * Is the TFile sorted?
861         * 
862         * @return true if TFile is sorted.
863         */
864        public boolean isSorted() {
865          return tfileMeta.isSorted();
866        }
867    
868        /**
869         * Get the number of key-value pair entries in TFile.
870         * 
871         * @return the number of key-value pairs in TFile
872         */
873        public long getEntryCount() {
874          return tfileMeta.getRecordCount();
875        }
876    
877        /**
878         * Lazily loading the TFile index.
879         * 
880         * @throws IOException
881         */
882        synchronized void checkTFileDataIndex() throws IOException {
883          if (tfileIndex == null) {
884            BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
885            try {
886              tfileIndex =
887                  new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
888                      .getComparator());
889            } finally {
890              brIndex.close();
891            }
892          }
893        }
894    
895        /**
896         * Get the first key in the TFile.
897         * 
898         * @return The first key in the TFile.
899         * @throws IOException
900         */
901        public RawComparable getFirstKey() throws IOException {
902          checkTFileDataIndex();
903          return tfileIndex.getFirstKey();
904        }
905    
906        /**
907         * Get the last key in the TFile.
908         * 
909         * @return The last key in the TFile.
910         * @throws IOException
911         */
912        public RawComparable getLastKey() throws IOException {
913          checkTFileDataIndex();
914          return tfileIndex.getLastKey();
915        }
916    
917        /**
918         * Get a Comparator object to compare Entries. It is useful when you want
919         * stores the entries in a collection (such as PriorityQueue) and perform
920         * sorting or comparison among entries based on the keys without copying out
921         * the key.
922         * 
923         * @return An Entry Comparator..
924         */
925        public Comparator<Scanner.Entry> getEntryComparator() {
926          if (!isSorted()) {
927            throw new RuntimeException(
928                "Entries are not comparable for unsorted TFiles");
929          }
930    
931          return new Comparator<Scanner.Entry>() {
932            /**
933             * Provide a customized comparator for Entries. This is useful if we
934             * have a collection of Entry objects. However, if the Entry objects
935             * come from different TFiles, users must ensure that those TFiles share
936             * the same RawComparator.
937             */
938            @Override
939            public int compare(Scanner.Entry o1, Scanner.Entry o2) {
940              return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
941                  .getKeyBuffer(), 0, o2.getKeyLength());
942            }
943          };
944        }
945    
946        /**
947         * Get an instance of the RawComparator that is constructed based on the
948         * string comparator representation.
949         * 
950         * @return a Comparator that can compare RawComparable's.
951         */
952        public Comparator<RawComparable> getComparator() {
953          return comparator;
954        }
955    
956        /**
957         * Stream access to a meta block.``
958         * 
959         * @param name
960         *          The name of the meta block.
961         * @return The input stream.
962         * @throws IOException
963         *           on I/O error.
964         * @throws MetaBlockDoesNotExist
965         *           If the meta block with the name does not exist.
966         */
967        public DataInputStream getMetaBlock(String name) throws IOException,
968            MetaBlockDoesNotExist {
969          return readerBCF.getMetaBlock(name);
970        }
971    
972        /**
973         * if greater is true then returns the beginning location of the block
974         * containing the key strictly greater than input key. if greater is false
975         * then returns the beginning location of the block greater than equal to
976         * the input key
977         * 
978         * @param key
979         *          the input key
980         * @param greater
981         *          boolean flag
982         * @return
983         * @throws IOException
984         */
985        Location getBlockContainsKey(RawComparable key, boolean greater)
986            throws IOException {
987          if (!isSorted()) {
988            throw new RuntimeException("Seeking in unsorted TFile");
989          }
990          checkTFileDataIndex();
991          int blkIndex =
992              (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
993          if (blkIndex < 0) return end;
994          return new Location(blkIndex, 0);
995        }
996    
997        Location getLocationByRecordNum(long recNum) throws IOException {
998          checkTFileDataIndex();
999          return tfileIndex.getLocationByRecordNum(recNum);
1000        }
1001    
1002        long getRecordNumByLocation(Location location) throws IOException {
1003          checkTFileDataIndex();
1004          return tfileIndex.getRecordNumByLocation(location);      
1005        }
1006        
1007        int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
1008          if (!isSorted()) {
1009            throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1010          }
1011          return comparator.compare(a, o1, l1, b, o2, l2);
1012        }
1013    
1014        int compareKeys(RawComparable a, RawComparable b) {
1015          if (!isSorted()) {
1016            throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1017          }
1018          return comparator.compare(a, b);
1019        }
1020    
1021        /**
1022         * Get the location pointing to the beginning of the first key-value pair in
1023         * a compressed block whose byte offset in the TFile is greater than or
1024         * equal to the specified offset.
1025         * 
1026         * @param offset
1027         *          the user supplied offset.
1028         * @return the location to the corresponding entry; or end() if no such
1029         *         entry exists.
1030         */
1031        Location getLocationNear(long offset) {
1032          int blockIndex = readerBCF.getBlockIndexNear(offset);
1033          if (blockIndex == -1) return end;
1034          return new Location(blockIndex, 0);
1035        }
1036    
1037        /**
1038         * Get the RecordNum for the first key-value pair in a compressed block
1039         * whose byte offset in the TFile is greater than or equal to the specified
1040         * offset.
1041         * 
1042         * @param offset
1043         *          the user supplied offset.
1044         * @return the RecordNum to the corresponding entry. If no such entry
1045         *         exists, it returns the total entry count.
1046         * @throws IOException
1047         */
1048        public long getRecordNumNear(long offset) throws IOException {
1049          return getRecordNumByLocation(getLocationNear(offset));
1050        }
1051        
1052        /**
1053         * Get a sample key that is within a block whose starting offset is greater
1054         * than or equal to the specified offset.
1055         * 
1056         * @param offset
1057         *          The file offset.
1058         * @return the key that fits the requirement; or null if no such key exists
1059         *         (which could happen if the offset is close to the end of the
1060         *         TFile).
1061         * @throws IOException
1062         */
1063        public RawComparable getKeyNear(long offset) throws IOException {
1064          int blockIndex = readerBCF.getBlockIndexNear(offset);
1065          if (blockIndex == -1) return null;
1066          checkTFileDataIndex();
1067          return new ByteArray(tfileIndex.getEntry(blockIndex).key);
1068        }
1069    
1070        /**
1071         * Get a scanner than can scan the whole TFile.
1072         * 
1073         * @return The scanner object. A valid Scanner is always returned even if
1074         *         the TFile is empty.
1075         * @throws IOException
1076         */
1077        public Scanner createScanner() throws IOException {
1078          return new Scanner(this, begin, end);
1079        }
1080    
1081        /**
1082         * Get a scanner that covers a portion of TFile based on byte offsets.
1083         * 
1084         * @param offset
1085         *          The beginning byte offset in the TFile.
1086         * @param length
1087         *          The length of the region.
1088         * @return The actual coverage of the returned scanner tries to match the
1089         *         specified byte-region but always round up to the compression
1090         *         block boundaries. It is possible that the returned scanner
1091         *         contains zero key-value pairs even if length is positive.
1092         * @throws IOException
1093         */
1094        public Scanner createScannerByByteRange(long offset, long length) throws IOException {
1095          return new Scanner(this, offset, offset + length);
1096        }
1097    
1098        /**
1099         * Get a scanner that covers a portion of TFile based on keys.
1100         * 
1101         * @param beginKey
1102         *          Begin key of the scan (inclusive). If null, scan from the first
1103         *          key-value entry of the TFile.
1104         * @param endKey
1105         *          End key of the scan (exclusive). If null, scan up to the last
1106         *          key-value entry of the TFile.
1107         * @return The actual coverage of the returned scanner will cover all keys
1108         *         greater than or equal to the beginKey and less than the endKey.
1109         * @throws IOException
1110         * 
1111         * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
1112         */
1113        @Deprecated
1114        public Scanner createScanner(byte[] beginKey, byte[] endKey)
1115          throws IOException {
1116          return createScannerByKey(beginKey, endKey);
1117        }
1118        
1119        /**
1120         * Get a scanner that covers a portion of TFile based on keys.
1121         * 
1122         * @param beginKey
1123         *          Begin key of the scan (inclusive). If null, scan from the first
1124         *          key-value entry of the TFile.
1125         * @param endKey
1126         *          End key of the scan (exclusive). If null, scan up to the last
1127         *          key-value entry of the TFile.
1128         * @return The actual coverage of the returned scanner will cover all keys
1129         *         greater than or equal to the beginKey and less than the endKey.
1130         * @throws IOException
1131         */
1132        public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
1133            throws IOException {
1134          return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
1135              0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
1136              0, endKey.length));
1137        }
1138    
1139        /**
1140         * Get a scanner that covers a specific key range.
1141         * 
1142         * @param beginKey
1143         *          Begin key of the scan (inclusive). If null, scan from the first
1144         *          key-value entry of the TFile.
1145         * @param endKey
1146         *          End key of the scan (exclusive). If null, scan up to the last
1147         *          key-value entry of the TFile.
1148         * @return The actual coverage of the returned scanner will cover all keys
1149         *         greater than or equal to the beginKey and less than the endKey.
1150         * @throws IOException
1151         * 
1152         * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
1153         *             instead.
1154         */
1155        @Deprecated
1156        public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
1157            throws IOException {
1158          return createScannerByKey(beginKey, endKey);
1159        }
1160    
1161        /**
1162         * Get a scanner that covers a specific key range.
1163         * 
1164         * @param beginKey
1165         *          Begin key of the scan (inclusive). If null, scan from the first
1166         *          key-value entry of the TFile.
1167         * @param endKey
1168         *          End key of the scan (exclusive). If null, scan up to the last
1169         *          key-value entry of the TFile.
1170         * @return The actual coverage of the returned scanner will cover all keys
1171         *         greater than or equal to the beginKey and less than the endKey.
1172         * @throws IOException
1173         */
1174        public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
1175            throws IOException {
1176          if ((beginKey != null) && (endKey != null)
1177              && (compareKeys(beginKey, endKey) >= 0)) {
1178            return new Scanner(this, beginKey, beginKey);
1179          }
1180          return new Scanner(this, beginKey, endKey);
1181        }
1182    
1183        /**
1184         * Create a scanner that covers a range of records.
1185         * 
1186         * @param beginRecNum
1187         *          The RecordNum for the first record (inclusive).
1188         * @param endRecNum
1189         *          The RecordNum for the last record (exclusive). To scan the whole
1190         *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
1191         * @return The TFile scanner that covers the specified range of records.
1192         * @throws IOException
1193         */
1194        public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
1195            throws IOException {
1196          if (beginRecNum < 0) beginRecNum = 0;
1197          if (endRecNum < 0 || endRecNum > getEntryCount()) {
1198            endRecNum = getEntryCount();
1199          }
1200          return new Scanner(this, getLocationByRecordNum(beginRecNum),
1201              getLocationByRecordNum(endRecNum));
1202        }
1203    
1204        /**
1205         * The TFile Scanner. The Scanner has an implicit cursor, which, upon
1206         * creation, points to the first key-value pair in the scan range. If the
1207         * scan range is empty, the cursor will point to the end of the scan range.
1208         * <p>
1209         * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
1210         * location of the scanner.
1211         * <p>
1212         * Use {@link Scanner#advance()} to move the cursor to the next key-value
1213         * pair (or end if none exists). Use seekTo methods (
1214         * {@link Scanner#seekTo(byte[])} or
1215         * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
1216         * location in the covered range (including backward seeking). Use
1217         * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
1218         * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
1219         * <p>
1220         * Actual keys and values may be obtained through {@link Scanner.Entry}
1221         * object, which is obtained through {@link Scanner#entry()}.
1222         */
1223        public static class Scanner implements Closeable {
1224          // The underlying TFile reader.
1225          final Reader reader;
1226          // current block (null if reaching end)
1227          private BlockReader blkReader;
1228    
1229          Location beginLocation;
1230          Location endLocation;
1231          Location currentLocation;
1232    
1233          // flag to ensure value is only examined once.
1234          boolean valueChecked = false;
1235          // reusable buffer for keys.
1236          final byte[] keyBuffer;
1237          // length of key, -1 means key is invalid.
1238          int klen = -1;
1239    
1240          static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
1241          BytesWritable valTransferBuffer;
1242    
1243          DataInputBuffer keyDataInputStream;
1244          ChunkDecoder valueBufferInputStream;
1245          DataInputStream valueDataInputStream;
1246          // vlen == -1 if unknown.
1247          int vlen;
1248    
1249          /**
1250           * Constructor
1251           * 
1252           * @param reader
1253           *          The TFile reader object.
1254           * @param offBegin
1255           *          Begin byte-offset of the scan.
1256           * @param offEnd
1257           *          End byte-offset of the scan.
1258           * @throws IOException
1259           * 
1260           *           The offsets will be rounded to the beginning of a compressed
1261           *           block whose offset is greater than or equal to the specified
1262           *           offset.
1263           */
1264          protected Scanner(Reader reader, long offBegin, long offEnd)
1265              throws IOException {
1266            this(reader, reader.getLocationNear(offBegin), reader
1267                .getLocationNear(offEnd));
1268          }
1269    
1270          /**
1271           * Constructor
1272           * 
1273           * @param reader
1274           *          The TFile reader object.
1275           * @param begin
1276           *          Begin location of the scan.
1277           * @param end
1278           *          End location of the scan.
1279           * @throws IOException
1280           */
1281          Scanner(Reader reader, Location begin, Location end) throws IOException {
1282            this.reader = reader;
1283            // ensure the TFile index is loaded throughout the life of scanner.
1284            reader.checkTFileDataIndex();
1285            beginLocation = begin;
1286            endLocation = end;
1287    
1288            valTransferBuffer = new BytesWritable();
1289            // TODO: remember the longest key in a TFile, and use it to replace
1290            // MAX_KEY_SIZE.
1291            keyBuffer = new byte[MAX_KEY_SIZE];
1292            keyDataInputStream = new DataInputBuffer();
1293            valueBufferInputStream = new ChunkDecoder();
1294            valueDataInputStream = new DataInputStream(valueBufferInputStream);
1295    
1296            if (beginLocation.compareTo(endLocation) >= 0) {
1297              currentLocation = new Location(endLocation);
1298            } else {
1299              currentLocation = new Location(0, 0);
1300              initBlock(beginLocation.getBlockIndex());
1301              inBlockAdvance(beginLocation.getRecordIndex());
1302            }
1303          }
1304    
1305          /**
1306           * Constructor
1307           * 
1308           * @param reader
1309           *          The TFile reader object.
1310           * @param beginKey
1311           *          Begin key of the scan. If null, scan from the first <K,V>
1312           *          entry of the TFile.
1313           * @param endKey
1314           *          End key of the scan. If null, scan up to the last <K, V> entry
1315           *          of the TFile.
1316           * @throws IOException
1317           */
1318          protected Scanner(Reader reader, RawComparable beginKey,
1319              RawComparable endKey) throws IOException {
1320            this(reader, (beginKey == null) ? reader.begin() : reader
1321                .getBlockContainsKey(beginKey, false), reader.end());
1322            if (beginKey != null) {
1323              inBlockAdvance(beginKey, false);
1324              beginLocation.set(currentLocation);
1325            }
1326            if (endKey != null) {
1327              seekTo(endKey, false);
1328              endLocation.set(currentLocation);
1329              seekTo(beginLocation);
1330            }
1331          }
1332    
1333          /**
1334           * Move the cursor to the first entry whose key is greater than or equal
1335           * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
1336           * returned by the previous entry() call will be invalid.
1337           * 
1338           * @param key
1339           *          The input key
1340           * @return true if we find an equal key.
1341           * @throws IOException
1342           */
1343          public boolean seekTo(byte[] key) throws IOException {
1344            return seekTo(key, 0, key.length);
1345          }
1346    
1347          /**
1348           * Move the cursor to the first entry whose key is greater than or equal
1349           * to the input key. The entry returned by the previous entry() call will
1350           * be invalid.
1351           * 
1352           * @param key
1353           *          The input key
1354           * @param keyOffset
1355           *          offset in the key buffer.
1356           * @param keyLen
1357           *          key buffer length.
1358           * @return true if we find an equal key; false otherwise.
1359           * @throws IOException
1360           */
1361          public boolean seekTo(byte[] key, int keyOffset, int keyLen)
1362              throws IOException {
1363            return seekTo(new ByteArray(key, keyOffset, keyLen), false);
1364          }
1365    
1366          private boolean seekTo(RawComparable key, boolean beyond)
1367              throws IOException {
1368            Location l = reader.getBlockContainsKey(key, beyond);
1369            if (l.compareTo(beginLocation) < 0) {
1370              l = beginLocation;
1371            } else if (l.compareTo(endLocation) >= 0) {
1372              seekTo(endLocation);
1373              return false;
1374            }
1375    
1376            // check if what we are seeking is in the later part of the current
1377            // block.
1378            if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
1379                || (compareCursorKeyTo(key) >= 0)) {
1380              // sorry, we must seek to a different location first.
1381              seekTo(l);
1382            }
1383    
1384            return inBlockAdvance(key, beyond);
1385          }
1386    
1387          /**
1388           * Move the cursor to the new location. The entry returned by the previous
1389           * entry() call will be invalid.
1390           * 
1391           * @param l
1392           *          new cursor location. It must fall between the begin and end
1393           *          location of the scanner.
1394           * @throws IOException
1395           */
1396          private void seekTo(Location l) throws IOException {
1397            if (l.compareTo(beginLocation) < 0) {
1398              throw new IllegalArgumentException(
1399                  "Attempt to seek before the begin location.");
1400            }
1401    
1402            if (l.compareTo(endLocation) > 0) {
1403              throw new IllegalArgumentException(
1404                  "Attempt to seek after the end location.");
1405            }
1406    
1407            if (l.compareTo(endLocation) == 0) {
1408              parkCursorAtEnd();
1409              return;
1410            }
1411    
1412            if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
1413              // going to a totally different block
1414              initBlock(l.getBlockIndex());
1415            } else {
1416              if (valueChecked) {
1417                // may temporarily go beyond the last record in the block (in which
1418                // case the next if loop will always be true).
1419                inBlockAdvance(1);
1420              }
1421              if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
1422                initBlock(l.getBlockIndex());
1423              }
1424            }
1425    
1426            inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
1427    
1428            return;
1429          }
1430    
1431          /**
1432           * Rewind to the first entry in the scanner. The entry returned by the
1433           * previous entry() call will be invalid.
1434           * 
1435           * @throws IOException
1436           */
1437          public void rewind() throws IOException {
1438            seekTo(beginLocation);
1439          }
1440    
1441          /**
1442           * Seek to the end of the scanner. The entry returned by the previous
1443           * entry() call will be invalid.
1444           * 
1445           * @throws IOException
1446           */
1447          public void seekToEnd() throws IOException {
1448            parkCursorAtEnd();
1449          }
1450    
1451          /**
1452           * Move the cursor to the first entry whose key is greater than or equal
1453           * to the input key. Synonymous to lowerBound(key, 0, key.length). The
1454           * entry returned by the previous entry() call will be invalid.
1455           * 
1456           * @param key
1457           *          The input key
1458           * @throws IOException
1459           */
1460          public void lowerBound(byte[] key) throws IOException {
1461            lowerBound(key, 0, key.length);
1462          }
1463    
1464          /**
1465           * Move the cursor to the first entry whose key is greater than or equal
1466           * to the input key. The entry returned by the previous entry() call will
1467           * be invalid.
1468           * 
1469           * @param key
1470           *          The input key
1471           * @param keyOffset
1472           *          offset in the key buffer.
1473           * @param keyLen
1474           *          key buffer length.
1475           * @throws IOException
1476           */
1477          public void lowerBound(byte[] key, int keyOffset, int keyLen)
1478              throws IOException {
1479            seekTo(new ByteArray(key, keyOffset, keyLen), false);
1480          }
1481    
1482          /**
1483           * Move the cursor to the first entry whose key is strictly greater than
1484           * the input key. Synonymous to upperBound(key, 0, key.length). The entry
1485           * returned by the previous entry() call will be invalid.
1486           * 
1487           * @param key
1488           *          The input key
1489           * @throws IOException
1490           */
1491          public void upperBound(byte[] key) throws IOException {
1492            upperBound(key, 0, key.length);
1493          }
1494    
1495          /**
1496           * Move the cursor to the first entry whose key is strictly greater than
1497           * the input key. The entry returned by the previous entry() call will be
1498           * invalid.
1499           * 
1500           * @param key
1501           *          The input key
1502           * @param keyOffset
1503           *          offset in the key buffer.
1504           * @param keyLen
1505           *          key buffer length.
1506           * @throws IOException
1507           */
1508          public void upperBound(byte[] key, int keyOffset, int keyLen)
1509              throws IOException {
1510            seekTo(new ByteArray(key, keyOffset, keyLen), true);
1511          }
1512    
1513          /**
1514           * Move the cursor to the next key-value pair. The entry returned by the
1515           * previous entry() call will be invalid.
1516           * 
1517           * @return true if the cursor successfully moves. False when cursor is
1518           *         already at the end location and cannot be advanced.
1519           * @throws IOException
1520           */
1521          public boolean advance() throws IOException {
1522            if (atEnd()) {
1523              return false;
1524            }
1525    
1526            int curBid = currentLocation.getBlockIndex();
1527            long curRid = currentLocation.getRecordIndex();
1528            long entriesInBlock = reader.getBlockEntryCount(curBid);
1529            if (curRid + 1 >= entriesInBlock) {
1530              if (endLocation.compareTo(curBid + 1, 0) <= 0) {
1531                // last entry in TFile.
1532                parkCursorAtEnd();
1533              } else {
1534                // last entry in Block.
1535                initBlock(curBid + 1);
1536              }
1537            } else {
1538              inBlockAdvance(1);
1539            }
1540            return true;
1541          }
1542    
1543          /**
1544           * Load a compressed block for reading. Expecting blockIndex is valid.
1545           * 
1546           * @throws IOException
1547           */
1548          private void initBlock(int blockIndex) throws IOException {
1549            klen = -1;
1550            if (blkReader != null) {
1551              try {
1552                blkReader.close();
1553              } finally {
1554                blkReader = null;
1555              }
1556            }
1557            blkReader = reader.getBlockReader(blockIndex);
1558            currentLocation.set(blockIndex, 0);
1559          }
1560    
1561          private void parkCursorAtEnd() throws IOException {
1562            klen = -1;
1563            currentLocation.set(endLocation);
1564            if (blkReader != null) {
1565              try {
1566                blkReader.close();
1567              } finally {
1568                blkReader = null;
1569              }
1570            }
1571          }
1572    
1573          /**
1574           * Close the scanner. Release all resources. The behavior of using the
1575           * scanner after calling close is not defined. The entry returned by the
1576           * previous entry() call will be invalid.
1577           */
1578          @Override
1579          public void close() throws IOException {
1580            parkCursorAtEnd();
1581          }
1582    
1583          /**
1584           * Is cursor at the end location?
1585           * 
1586           * @return true if the cursor is at the end location.
1587           */
1588          public boolean atEnd() {
1589            return (currentLocation.compareTo(endLocation) >= 0);
1590          }
1591    
1592          /**
1593           * check whether we have already successfully obtained the key. It also
1594           * initializes the valueInputStream.
1595           */
1596          void checkKey() throws IOException {
1597            if (klen >= 0) return;
1598            if (atEnd()) {
1599              throw new EOFException("No key-value to read");
1600            }
1601            klen = -1;
1602            vlen = -1;
1603            valueChecked = false;
1604    
1605            klen = Utils.readVInt(blkReader);
1606            blkReader.readFully(keyBuffer, 0, klen);
1607            valueBufferInputStream.reset(blkReader);
1608            if (valueBufferInputStream.isLastChunk()) {
1609              vlen = valueBufferInputStream.getRemain();
1610            }
1611          }
1612    
1613          /**
1614           * Get an entry to access the key and value.
1615           * 
1616           * @return The Entry object to access the key and value.
1617           * @throws IOException
1618           */
1619          public Entry entry() throws IOException {
1620            checkKey();
1621            return new Entry();
1622          }
1623    
1624          /**
1625           * Get the RecordNum corresponding to the entry pointed by the cursor.
1626           * @return The RecordNum corresponding to the entry pointed by the cursor.
1627           * @throws IOException
1628           */
1629          public long getRecordNum() throws IOException {
1630            return reader.getRecordNumByLocation(currentLocation);
1631          }
1632          
1633          /**
1634           * Internal API. Comparing the key at cursor to user-specified key.
1635           * 
1636           * @param other
1637           *          user-specified key.
1638           * @return negative if key at cursor is smaller than user key; 0 if equal;
1639           *         and positive if key at cursor greater than user key.
1640           * @throws IOException
1641           */
1642          int compareCursorKeyTo(RawComparable other) throws IOException {
1643            checkKey();
1644            return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
1645                .offset(), other.size());
1646          }
1647    
1648          /**
1649           * Entry to a &lt;Key, Value&gt; pair.
1650           */
1651          public class Entry implements Comparable<RawComparable> {
1652            /**
1653             * Get the length of the key.
1654             * 
1655             * @return the length of the key.
1656             */
1657            public int getKeyLength() {
1658              return klen;
1659            }
1660    
1661            byte[] getKeyBuffer() {
1662              return keyBuffer;
1663            }
1664    
1665            /**
1666             * Copy the key and value in one shot into BytesWritables. This is
1667             * equivalent to getKey(key); getValue(value);
1668             * 
1669             * @param key
1670             *          BytesWritable to hold key.
1671             * @param value
1672             *          BytesWritable to hold value
1673             * @throws IOException
1674             */
1675            public void get(BytesWritable key, BytesWritable value)
1676                throws IOException {
1677              getKey(key);
1678              getValue(value);
1679            }
1680    
1681            /**
1682             * Copy the key into BytesWritable. The input BytesWritable will be
1683             * automatically resized to the actual key size.
1684             * 
1685             * @param key
1686             *          BytesWritable to hold the key.
1687             * @throws IOException
1688             */
1689            public int getKey(BytesWritable key) throws IOException {
1690              key.setSize(getKeyLength());
1691              getKey(key.getBytes());
1692              return key.getLength();
1693            }
1694    
1695            /**
1696             * Copy the value into BytesWritable. The input BytesWritable will be
1697             * automatically resized to the actual value size. The implementation
1698             * directly uses the buffer inside BytesWritable for storing the value.
1699             * The call does not require the value length to be known.
1700             * 
1701             * @param value
1702             * @throws IOException
1703             */
1704            public long getValue(BytesWritable value) throws IOException {
1705              DataInputStream dis = getValueStream();
1706              int size = 0;
1707              try {
1708                int remain;
1709                while ((remain = valueBufferInputStream.getRemain()) > 0) {
1710                  value.setSize(size + remain);
1711                  dis.readFully(value.getBytes(), size, remain);
1712                  size += remain;
1713                }
1714                return value.getLength();
1715              } finally {
1716                dis.close();
1717              }
1718            }
1719    
1720            /**
1721             * Writing the key to the output stream. This method avoids copying key
1722             * buffer from Scanner into user buffer, then writing to the output
1723             * stream.
1724             * 
1725             * @param out
1726             *          The output stream
1727             * @return the length of the key.
1728             * @throws IOException
1729             */
1730            public int writeKey(OutputStream out) throws IOException {
1731              out.write(keyBuffer, 0, klen);
1732              return klen;
1733            }
1734    
1735            /**
1736             * Writing the value to the output stream. This method avoids copying
1737             * value data from Scanner into user buffer, then writing to the output
1738             * stream. It does not require the value length to be known.
1739             * 
1740             * @param out
1741             *          The output stream
1742             * @return the length of the value
1743             * @throws IOException
1744             */
1745            public long writeValue(OutputStream out) throws IOException {
1746              DataInputStream dis = getValueStream();
1747              long size = 0;
1748              try {
1749                int chunkSize;
1750                while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
1751                  chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
1752                  valTransferBuffer.setSize(chunkSize);
1753                  dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
1754                  out.write(valTransferBuffer.getBytes(), 0, chunkSize);
1755                  size += chunkSize;
1756                }
1757                return size;
1758              } finally {
1759                dis.close();
1760              }
1761            }
1762    
1763            /**
1764             * Copy the key into user supplied buffer.
1765             * 
1766             * @param buf
1767             *          The buffer supplied by user. The length of the buffer must
1768             *          not be shorter than the key length.
1769             * @return The length of the key.
1770             * 
1771             * @throws IOException
1772             */
1773            public int getKey(byte[] buf) throws IOException {
1774              return getKey(buf, 0);
1775            }
1776    
1777            /**
1778             * Copy the key into user supplied buffer.
1779             * 
1780             * @param buf
1781             *          The buffer supplied by user.
1782             * @param offset
1783             *          The starting offset of the user buffer where we should copy
1784             *          the key into. Requiring the key-length + offset no greater
1785             *          than the buffer length.
1786             * @return The length of the key.
1787             * @throws IOException
1788             */
1789            public int getKey(byte[] buf, int offset) throws IOException {
1790              if ((offset | (buf.length - offset - klen)) < 0) {
1791                throw new IndexOutOfBoundsException(
1792                    "Bufer not enough to store the key");
1793              }
1794              System.arraycopy(keyBuffer, 0, buf, offset, klen);
1795              return klen;
1796            }
1797    
1798            /**
1799             * Streaming access to the key. Useful for desrializing the key into
1800             * user objects.
1801             * 
1802             * @return The input stream.
1803             */
1804            public DataInputStream getKeyStream() {
1805              keyDataInputStream.reset(keyBuffer, klen);
1806              return keyDataInputStream;
1807            }
1808    
1809            /**
1810             * Get the length of the value. isValueLengthKnown() must be tested
1811             * true.
1812             * 
1813             * @return the length of the value.
1814             */
1815            public int getValueLength() {
1816              if (vlen >= 0) {
1817                return vlen;
1818              }
1819    
1820              throw new RuntimeException("Value length unknown.");
1821            }
1822    
1823            /**
1824             * Copy value into user-supplied buffer. User supplied buffer must be
1825             * large enough to hold the whole value. The value part of the key-value
1826             * pair pointed by the current cursor is not cached and can only be
1827             * examined once. Calling any of the following functions more than once
1828             * without moving the cursor will result in exception:
1829             * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1830             * {@link #getValueStream}.
1831             * 
1832             * @return the length of the value. Does not require
1833             *         isValueLengthKnown() to be true.
1834             * @throws IOException
1835             * 
1836             */
1837            public int getValue(byte[] buf) throws IOException {
1838              return getValue(buf, 0);
1839            }
1840    
1841            /**
1842             * Copy value into user-supplied buffer. User supplied buffer must be
1843             * large enough to hold the whole value (starting from the offset). The
1844             * value part of the key-value pair pointed by the current cursor is not
1845             * cached and can only be examined once. Calling any of the following
1846             * functions more than once without moving the cursor will result in
1847             * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1848             * {@link #getValueStream}.
1849             * 
1850             * @return the length of the value. Does not require
1851             *         isValueLengthKnown() to be true.
1852             * @throws IOException
1853             */
1854            public int getValue(byte[] buf, int offset) throws IOException {
1855              DataInputStream dis = getValueStream();
1856              try {
1857                if (isValueLengthKnown()) {
1858                  if ((offset | (buf.length - offset - vlen)) < 0) {
1859                    throw new IndexOutOfBoundsException(
1860                        "Buffer too small to hold value");
1861                  }
1862                  dis.readFully(buf, offset, vlen);
1863                  return vlen;
1864                }
1865    
1866                int nextOffset = offset;
1867                while (nextOffset < buf.length) {
1868                  int n = dis.read(buf, nextOffset, buf.length - nextOffset);
1869                  if (n < 0) {
1870                    break;
1871                  }
1872                  nextOffset += n;
1873                }
1874                if (dis.read() >= 0) {
1875                  // attempt to read one more byte to determine whether we reached
1876                  // the
1877                  // end or not.
1878                  throw new IndexOutOfBoundsException(
1879                      "Buffer too small to hold value");
1880                }
1881                return nextOffset - offset;
1882              } finally {
1883                dis.close();
1884              }
1885            }
1886    
1887            /**
1888             * Stream access to value. The value part of the key-value pair pointed
1889             * by the current cursor is not cached and can only be examined once.
1890             * Calling any of the following functions more than once without moving
1891             * the cursor will result in exception: {@link #getValue(byte[])},
1892             * {@link #getValue(byte[], int)}, {@link #getValueStream}.
1893             * 
1894             * @return The input stream for reading the value.
1895             * @throws IOException
1896             */
1897            public DataInputStream getValueStream() throws IOException {
1898              if (valueChecked == true) {
1899                throw new IllegalStateException(
1900                    "Attempt to examine value multiple times.");
1901              }
1902              valueChecked = true;
1903              return valueDataInputStream;
1904            }
1905    
1906            /**
1907             * Check whether it is safe to call getValueLength().
1908             * 
1909             * @return true if value length is known before hand. Values less than
1910             *         the chunk size will always have their lengths known before
1911             *         hand. Values that are written out as a whole (with advertised
1912             *         length up-front) will always have their lengths known in
1913             *         read.
1914             */
1915            public boolean isValueLengthKnown() {
1916              return (vlen >= 0);
1917            }
1918    
1919            /**
1920             * Compare the entry key to another key. Synonymous to compareTo(key, 0,
1921             * key.length).
1922             * 
1923             * @param buf
1924             *          The key buffer.
1925             * @return comparison result between the entry key with the input key.
1926             */
1927            public int compareTo(byte[] buf) {
1928              return compareTo(buf, 0, buf.length);
1929            }
1930    
1931            /**
1932             * Compare the entry key to another key. Synonymous to compareTo(new
1933             * ByteArray(buf, offset, length)
1934             * 
1935             * @param buf
1936             *          The key buffer
1937             * @param offset
1938             *          offset into the key buffer.
1939             * @param length
1940             *          the length of the key.
1941             * @return comparison result between the entry key with the input key.
1942             */
1943            public int compareTo(byte[] buf, int offset, int length) {
1944              return compareTo(new ByteArray(buf, offset, length));
1945            }
1946    
1947            /**
1948             * Compare an entry with a RawComparable object. This is useful when
1949             * Entries are stored in a collection, and we want to compare a user
1950             * supplied key.
1951             */
1952            @Override
1953            public int compareTo(RawComparable key) {
1954              return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
1955                  key.offset(), key.size());
1956            }
1957    
1958            /**
1959             * Compare whether this and other points to the same key value.
1960             */
1961            @Override
1962            public boolean equals(Object other) {
1963              if (this == other) return true;
1964              if (!(other instanceof Entry)) return false;
1965              return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
1966            }
1967    
1968            @Override
1969            public int hashCode() {
1970              return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
1971            }
1972          }
1973    
1974          /**
1975           * Advance cursor by n positions within the block.
1976           * 
1977           * @param n
1978           *          Number of key-value pairs to skip in block.
1979           * @throws IOException
1980           */
1981          private void inBlockAdvance(long n) throws IOException {
1982            for (long i = 0; i < n; ++i) {
1983              checkKey();
1984              if (!valueBufferInputStream.isClosed()) {
1985                valueBufferInputStream.close();
1986              }
1987              klen = -1;
1988              currentLocation.incRecordIndex();
1989            }
1990          }
1991    
1992          /**
1993           * Advance cursor in block until we find a key that is greater than or
1994           * equal to the input key.
1995           * 
1996           * @param key
1997           *          Key to compare.
1998           * @param greater
1999           *          advance until we find a key greater than the input key.
2000           * @return true if we find a equal key.
2001           * @throws IOException
2002           */
2003          private boolean inBlockAdvance(RawComparable key, boolean greater)
2004              throws IOException {
2005            int curBid = currentLocation.getBlockIndex();
2006            long entryInBlock = reader.getBlockEntryCount(curBid);
2007            if (curBid == endLocation.getBlockIndex()) {
2008              entryInBlock = endLocation.getRecordIndex();
2009            }
2010    
2011            while (currentLocation.getRecordIndex() < entryInBlock) {
2012              int cmp = compareCursorKeyTo(key);
2013              if (cmp > 0) return false;
2014              if (cmp == 0 && !greater) return true;
2015              if (!valueBufferInputStream.isClosed()) {
2016                valueBufferInputStream.close();
2017              }
2018              klen = -1;
2019              currentLocation.incRecordIndex();
2020            }
2021    
2022            throw new RuntimeException("Cannot find matching key in block.");
2023          }
2024        }
2025    
2026        long getBlockEntryCount(int curBid) {
2027          return tfileIndex.getEntry(curBid).entries();
2028        }
2029    
2030        BlockReader getBlockReader(int blockIndex) throws IOException {
2031          return readerBCF.getDataBlock(blockIndex);
2032        }
2033      }
2034    
2035      /**
2036       * Data structure representing "TFile.meta" meta block.
2037       */
2038      static final class TFileMeta {
2039        final static String BLOCK_NAME = "TFile.meta";
2040        final Version version;
2041        private long recordCount;
2042        private final String strComparator;
2043        private final BytesComparator comparator;
2044    
2045        // ctor for writes
2046        public TFileMeta(String comparator) {
2047          // set fileVersion to API version when we create it.
2048          version = TFile.API_VERSION;
2049          recordCount = 0;
2050          strComparator = (comparator == null) ? "" : comparator;
2051          this.comparator = makeComparator(strComparator);
2052        }
2053    
2054        // ctor for reads
2055        public TFileMeta(DataInput in) throws IOException {
2056          version = new Version(in);
2057          if (!version.compatibleWith(TFile.API_VERSION)) {
2058            throw new RuntimeException("Incompatible TFile fileVersion.");
2059          }
2060          recordCount = Utils.readVLong(in);
2061          strComparator = Utils.readString(in);
2062          comparator = makeComparator(strComparator);
2063        }
2064    
2065        @SuppressWarnings("unchecked")
2066        static BytesComparator makeComparator(String comparator) {
2067          if (comparator.length() == 0) {
2068            // unsorted keys
2069            return null;
2070          }
2071          if (comparator.equals(COMPARATOR_MEMCMP)) {
2072            // default comparator
2073            return new BytesComparator(new MemcmpRawComparator());
2074          } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
2075            String compClassName =
2076                comparator.substring(COMPARATOR_JCLASS.length()).trim();
2077            try {
2078              Class compClass = Class.forName(compClassName);
2079              // use its default ctor to create an instance
2080              return new BytesComparator((RawComparator<Object>) compClass
2081                  .newInstance());
2082            } catch (Exception e) {
2083              throw new IllegalArgumentException(
2084                  "Failed to instantiate comparator: " + comparator + "("
2085                      + e.toString() + ")");
2086            }
2087          } else {
2088            throw new IllegalArgumentException("Unsupported comparator: "
2089                + comparator);
2090          }
2091        }
2092    
2093        public void write(DataOutput out) throws IOException {
2094          TFile.API_VERSION.write(out);
2095          Utils.writeVLong(out, recordCount);
2096          Utils.writeString(out, strComparator);
2097        }
2098    
2099        public long getRecordCount() {
2100          return recordCount;
2101        }
2102    
2103        public void incRecordCount() {
2104          ++recordCount;
2105        }
2106    
2107        public boolean isSorted() {
2108          return !strComparator.isEmpty();
2109        }
2110    
2111        public String getComparatorString() {
2112          return strComparator;
2113        }
2114    
2115        public BytesComparator getComparator() {
2116          return comparator;
2117        }
2118    
2119        public Version getVersion() {
2120          return version;
2121        }
2122      } // END: class MetaTFileMeta
2123    
2124      /**
2125       * Data structure representing "TFile.index" meta block.
2126       */
2127      static class TFileIndex {
2128        final static String BLOCK_NAME = "TFile.index";
2129        private ByteArray firstKey;
2130        private final ArrayList<TFileIndexEntry> index;
2131        private final ArrayList<Long> recordNumIndex;
2132        private final BytesComparator comparator;
2133        private long sum = 0;
2134        
2135        /**
2136         * For reading from file.
2137         * 
2138         * @throws IOException
2139         */
2140        public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
2141            throws IOException {
2142          index = new ArrayList<TFileIndexEntry>(entryCount);
2143          recordNumIndex = new ArrayList<Long>(entryCount);
2144          int size = Utils.readVInt(in); // size for the first key entry.
2145          if (size > 0) {
2146            byte[] buffer = new byte[size];
2147            in.readFully(buffer);
2148            DataInputStream firstKeyInputStream =
2149                new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
2150    
2151            int firstKeyLength = Utils.readVInt(firstKeyInputStream);
2152            firstKey = new ByteArray(new byte[firstKeyLength]);
2153            firstKeyInputStream.readFully(firstKey.buffer());
2154    
2155            for (int i = 0; i < entryCount; i++) {
2156              size = Utils.readVInt(in);
2157              if (buffer.length < size) {
2158                buffer = new byte[size];
2159              }
2160              in.readFully(buffer, 0, size);
2161              TFileIndexEntry idx =
2162                  new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
2163                      buffer, 0, size)));
2164              index.add(idx);
2165              sum += idx.entries();
2166              recordNumIndex.add(sum);
2167            }
2168          } else {
2169            if (entryCount != 0) {
2170              throw new RuntimeException("Internal error");
2171            }
2172          }
2173          this.comparator = comparator;
2174        }
2175    
2176        /**
2177         * @param key
2178         *          input key.
2179         * @return the ID of the first block that contains key >= input key. Or -1
2180         *         if no such block exists.
2181         */
2182        public int lowerBound(RawComparable key) {
2183          if (comparator == null) {
2184            throw new RuntimeException("Cannot search in unsorted TFile");
2185          }
2186    
2187          if (firstKey == null) {
2188            return -1; // not found
2189          }
2190    
2191          int ret = Utils.lowerBound(index, key, comparator);
2192          if (ret == index.size()) {
2193            return -1;
2194          }
2195          return ret;
2196        }
2197    
2198        /**
2199         * @param key
2200         *          input key.
2201         * @return the ID of the first block that contains key > input key. Or -1
2202         *         if no such block exists.
2203         */
2204        public int upperBound(RawComparable key) {
2205          if (comparator == null) {
2206            throw new RuntimeException("Cannot search in unsorted TFile");
2207          }
2208    
2209          if (firstKey == null) {
2210            return -1; // not found
2211          }
2212    
2213          int ret = Utils.upperBound(index, key, comparator);
2214          if (ret == index.size()) {
2215            return -1;
2216          }
2217          return ret;
2218        }
2219    
2220        /**
2221         * For writing to file.
2222         */
2223        public TFileIndex(BytesComparator comparator) {
2224          index = new ArrayList<TFileIndexEntry>();
2225          recordNumIndex = new ArrayList<Long>();
2226          this.comparator = comparator;
2227        }
2228    
2229        public RawComparable getFirstKey() {
2230          return firstKey;
2231        }
2232        
2233        public Reader.Location getLocationByRecordNum(long recNum) {
2234          int idx = Utils.upperBound(recordNumIndex, recNum);
2235          long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
2236          return new Reader.Location(idx, recNum-lastRecNum);
2237        }
2238    
2239        public long getRecordNumByLocation(Reader.Location location) {
2240          int blkIndex = location.getBlockIndex();
2241          long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
2242          return lastRecNum + location.getRecordIndex();
2243        }
2244        
2245        public void setFirstKey(byte[] key, int offset, int length) {
2246          firstKey = new ByteArray(new byte[length]);
2247          System.arraycopy(key, offset, firstKey.buffer(), 0, length);
2248        }
2249    
2250        public RawComparable getLastKey() {
2251          if (index.size() == 0) {
2252            return null;
2253          }
2254          return new ByteArray(index.get(index.size() - 1).buffer());
2255        }
2256    
2257        public void addEntry(TFileIndexEntry keyEntry) {
2258          index.add(keyEntry);
2259          sum += keyEntry.entries();
2260          recordNumIndex.add(sum);
2261        }
2262    
2263        public TFileIndexEntry getEntry(int bid) {
2264          return index.get(bid);
2265        }
2266    
2267        public void write(DataOutput out) throws IOException {
2268          if (firstKey == null) {
2269            Utils.writeVInt(out, 0);
2270            return;
2271          }
2272    
2273          DataOutputBuffer dob = new DataOutputBuffer();
2274          Utils.writeVInt(dob, firstKey.size());
2275          dob.write(firstKey.buffer());
2276          Utils.writeVInt(out, dob.size());
2277          out.write(dob.getData(), 0, dob.getLength());
2278    
2279          for (TFileIndexEntry entry : index) {
2280            dob.reset();
2281            entry.write(dob);
2282            Utils.writeVInt(out, dob.getLength());
2283            out.write(dob.getData(), 0, dob.getLength());
2284          }
2285        }
2286      }
2287    
2288      /**
2289       * TFile Data Index entry. We should try to make the memory footprint of each
2290       * index entry as small as possible.
2291       */
2292      static final class TFileIndexEntry implements RawComparable {
2293        final byte[] key;
2294        // count of <key, value> entries in the block.
2295        final long kvEntries;
2296    
2297        public TFileIndexEntry(DataInput in) throws IOException {
2298          int len = Utils.readVInt(in);
2299          key = new byte[len];
2300          in.readFully(key, 0, len);
2301          kvEntries = Utils.readVLong(in);
2302        }
2303    
2304        // default entry, without any padding
2305        public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
2306          key = new byte[len];
2307          System.arraycopy(newkey, offset, key, 0, len);
2308          this.kvEntries = entries;
2309        }
2310    
2311        @Override
2312        public byte[] buffer() {
2313          return key;
2314        }
2315    
2316        @Override
2317        public int offset() {
2318          return 0;
2319        }
2320    
2321        @Override
2322        public int size() {
2323          return key.length;
2324        }
2325    
2326        long entries() {
2327          return kvEntries;
2328        }
2329    
2330        public void write(DataOutput out) throws IOException {
2331          Utils.writeVInt(out, key.length);
2332          out.write(key, 0, key.length);
2333          Utils.writeVLong(out, kvEntries);
2334        }
2335      }
2336    
2337      /**
2338       * Dumping the TFile information.
2339       * 
2340       * @param args
2341       *          A list of TFile paths.
2342       */
2343      public static void main(String[] args) {
2344        System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION
2345            .toString(), BCFile.API_VERSION.toString());
2346        if (args.length == 0) {
2347          System.out
2348              .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
2349          System.exit(0);
2350        }
2351        Configuration conf = new Configuration();
2352    
2353        for (String file : args) {
2354          System.out.println("===" + file + "===");
2355          try {
2356            TFileDumper.dumpInfo(file, System.out, conf);
2357          } catch (IOException e) {
2358            e.printStackTrace(System.err);
2359          }
2360        }
2361      }
2362    }