001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 * 
009 * http://www.apache.org/licenses/LICENSE-2.0
010 * 
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017
018package org.apache.hadoop.io.file.tfile;
019
020import java.io.ByteArrayInputStream;
021import java.io.Closeable;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.DataOutput;
025import java.io.DataOutputStream;
026import java.io.EOFException;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.util.ArrayList;
030import java.util.Comparator;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.io.BoundedByteArrayOutputStream;
040import org.apache.hadoop.io.BytesWritable;
041import org.apache.hadoop.io.DataInputBuffer;
042import org.apache.hadoop.io.DataOutputBuffer;
043import org.apache.hadoop.io.IOUtils;
044import org.apache.hadoop.io.RawComparator;
045import org.apache.hadoop.io.WritableComparator;
046import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
047import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
048import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
049import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
050import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
051import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
052import org.apache.hadoop.io.file.tfile.Utils.Version;
053import org.apache.hadoop.io.serializer.JavaSerializationComparator;
054
055/**
056 * A TFile is a container of key-value pairs. Both keys and values are type-less
057 * bytes. Keys are restricted to 64KB, value length is not restricted
058 * (practically limited to the available disk storage). TFile further provides
059 * the following features:
060 * <ul>
061 * <li>Block Compression.
062 * <li>Named meta data blocks.
063 * <li>Sorted or unsorted keys.
064 * <li>Seek by key or by file offset.
065 * </ul>
066 * The memory footprint of a TFile includes the following:
067 * <ul>
068 * <li>Some constant overhead of reading or writing a compressed block.
069 * <ul>
070 * <li>Each compressed block requires one compression/decompression codec for
071 * I/O.
072 * <li>Temporary space to buffer the key.
073 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
074 * chunk encoded, so that we buffer at most one chunk of user data. By default,
075 * the chunk buffer is 1MB. Reading chunked value does not require additional
076 * memory.
077 * </ul>
078 * <li>TFile index, which is proportional to the total number of Data Blocks.
079 * The total amount of memory needed to hold the index can be estimated as
080 * (56+AvgKeySize)*NumBlocks.
081 * <li>MetaBlock index, which is proportional to the total number of Meta
082 * Blocks.The total amount of memory needed to hold the index for Meta Blocks
083 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
084 * </ul>
085 * <p>
086 * The behavior of TFile can be customized by the following variables through
087 * Configuration:
088 * <ul>
089 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
090 * to 1MB. Values of the length less than the chunk size is guaranteed to have
091 * known value length in read time (See
092 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
093 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
094 * FSDataOutputStream. Integer (in bytes). Default to 256KB.
095 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
096 * FSDataInputStream. Integer (in bytes). Default to 256KB.
097 * </ul>
098 * <p>
099 * Suggestions on performance optimization.
100 * <ul>
101 * <li>Minimum block size. We recommend a setting of minimum block size between
102 * 256KB to 1MB for general usage. Larger block size is preferred if files are
103 * primarily for sequential access. However, it would lead to inefficient random
104 * access (because there are more data to decompress). Smaller blocks are good
105 * for random access, but require more memory to hold the block index, and may
106 * be slower to create (because we must flush the compressor stream at the
107 * conclusion of each data block, which leads to an FS I/O flush). Further, due
108 * to the internal caching in Compression codec, the smallest possible block
109 * size would be around 20KB-30KB.
110 * <li>The current implementation does not offer true multi-threading for
111 * reading. The implementation uses FSDataInputStream seek()+read(), which is
112 * shown to be much faster than positioned-read call in single thread mode.
113 * However, it also means that if multiple threads attempt to access the same
114 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
115 * sequentially even if they access different DFS blocks.
116 * <li>Compression codec. Use "none" if the data is not very compressable (by
117 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
118 * as the starting point for experimenting. "gz" overs slightly better
119 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
120 * decompress, comparing to "lzo".
121 * <li>File system buffering, if the underlying FSDataInputStream and
122 * FSDataOutputStream is already adequately buffered; or if applications
123 * reads/writes keys and values in large buffers, we can reduce the sizes of
124 * input/output buffering in TFile layer by setting the configuration parameters
125 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
126 * </ul>
127 * 
128 * Some design rationale behind TFile can be found at <a
129 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
130 */
131@InterfaceAudience.Public
132@InterfaceStability.Evolving
133public class TFile {
134  static final Log LOG = LogFactory.getLog(TFile.class);
135
136  private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
137  private static final String FS_INPUT_BUF_SIZE_ATTR =
138      "tfile.fs.input.buffer.size";
139  private static final String FS_OUTPUT_BUF_SIZE_ATTR =
140      "tfile.fs.output.buffer.size";
141
142  static int getChunkBufferSize(Configuration conf) {
143    int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
144    return (ret > 0) ? ret : 1024 * 1024;
145  }
146
147  static int getFSInputBufferSize(Configuration conf) {
148    return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
149  }
150
151  static int getFSOutputBufferSize(Configuration conf) {
152    return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
153  }
154
155  private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
156  static final Version API_VERSION = new Version((short) 1, (short) 0);
157
158  /** compression: gzip */
159  public static final String COMPRESSION_GZ = "gz";
160  /** compression: lzo */
161  public static final String COMPRESSION_LZO = "lzo";
162  /** compression: none */
163  public static final String COMPRESSION_NONE = "none";
164  /** comparator: memcmp */
165  public static final String COMPARATOR_MEMCMP = "memcmp";
166  /** comparator prefix: java class */
167  public static final String COMPARATOR_JCLASS = "jclass:";
168
169  /**
170   * Make a raw comparator from a string name.
171   * 
172   * @param name
173   *          Comparator name
174   * @return A RawComparable comparator.
175   */
176  static public Comparator<RawComparable> makeComparator(String name) {
177    return TFileMeta.makeComparator(name);
178  }
179
180  // Prevent the instantiation of TFiles
181  private TFile() {
182    // nothing
183  }
184
185  /**
186   * Get names of supported compression algorithms. The names are acceptable by
187   * TFile.Writer.
188   * 
189   * @return Array of strings, each represents a supported compression
190   *         algorithm. Currently, the following compression algorithms are
191   *         supported.
192   *         <ul>
193   *         <li>"none" - No compression.
194   *         <li>"lzo" - LZO compression.
195   *         <li>"gz" - GZIP compression.
196   *         </ul>
197   */
198  public static String[] getSupportedCompressionAlgorithms() {
199    return Compression.getSupportedAlgorithms();
200  }
201
202  /**
203   * TFile Writer.
204   */
205  @InterfaceStability.Evolving
206  public static class Writer implements Closeable {
207    // minimum compressed size for a block.
208    private final int sizeMinBlock;
209
210    // Meta blocks.
211    final TFileIndex tfileIndex;
212    final TFileMeta tfileMeta;
213
214    // reference to the underlying BCFile.
215    private BCFile.Writer writerBCF;
216
217    // current data block appender.
218    BlockAppender blkAppender;
219    long blkRecordCount;
220
221    // buffers for caching the key.
222    BoundedByteArrayOutputStream currentKeyBufferOS;
223    BoundedByteArrayOutputStream lastKeyBufferOS;
224
225    // buffer used by chunk codec
226    private byte[] valueBuffer;
227
228    /**
229     * Writer states. The state always transits in circles: READY -> IN_KEY ->
230     * END_KEY -> IN_VALUE -> READY.
231     */
232    private enum State {
233      READY, // Ready to start a new key-value pair insertion.
234      IN_KEY, // In the middle of key insertion.
235      END_KEY, // Key insertion complete, ready to insert value.
236      IN_VALUE, // In value insertion.
237      // ERROR, // Error encountered, cannot continue.
238      CLOSED, // TFile already closed.
239    };
240
241    // current state of Writer.
242    State state = State.READY;
243    Configuration conf;
244    long errorCount = 0;
245
246    /**
247     * Constructor
248     * 
249     * @param fsdos
250     *          output stream for writing. Must be at position 0.
251     * @param minBlockSize
252     *          Minimum compressed block size in bytes. A compression block will
253     *          not be closed until it reaches this size except for the last
254     *          block.
255     * @param compressName
256     *          Name of the compression algorithm. Must be one of the strings
257     *          returned by {@link TFile#getSupportedCompressionAlgorithms()}.
258     * @param comparator
259     *          Leave comparator as null or empty string if TFile is not sorted.
260     *          Otherwise, provide the string name for the comparison algorithm
261     *          for keys. Two kinds of comparators are supported.
262     *          <ul>
263     *          <li>Algorithmic comparator: binary comparators that is language
264     *          independent. Currently, only "memcmp" is supported.
265     *          <li>Language-specific comparator: binary comparators that can
266     *          only be constructed in specific language. For Java, the syntax
267     *          is "jclass:", followed by the class name of the RawComparator.
268     *          Currently, we only support RawComparators that can be
269     *          constructed through the default constructor (with no
270     *          parameters). Parameterized RawComparators such as
271     *          {@link WritableComparator} or
272     *          {@link JavaSerializationComparator} may not be directly used.
273     *          One should write a wrapper class that inherits from such classes
274     *          and use its default constructor to perform proper
275     *          initialization.
276     *          </ul>
277     * @param conf
278     *          The configuration object.
279     * @throws IOException
280     */
281    public Writer(FSDataOutputStream fsdos, int minBlockSize,
282        String compressName, String comparator, Configuration conf)
283        throws IOException {
284      sizeMinBlock = minBlockSize;
285      tfileMeta = new TFileMeta(comparator);
286      tfileIndex = new TFileIndex(tfileMeta.getComparator());
287
288      writerBCF = new BCFile.Writer(fsdos, compressName, conf);
289      currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
290      lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
291      this.conf = conf;
292    }
293
294    /**
295     * Close the Writer. Resources will be released regardless of the exceptions
296     * being thrown. Future close calls will have no effect.
297     * 
298     * The underlying FSDataOutputStream is not closed.
299     */
300    public void close() throws IOException {
301      if ((state == State.CLOSED)) {
302        return;
303      }
304      try {
305        // First try the normal finish.
306        // Terminate upon the first Exception.
307        if (errorCount == 0) {
308          if (state != State.READY) {
309            throw new IllegalStateException(
310                "Cannot close TFile in the middle of key-value insertion.");
311          }
312
313          finishDataBlock(true);
314
315          // first, write out data:TFile.meta
316          BlockAppender outMeta =
317              writerBCF
318                  .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
319          try {
320            tfileMeta.write(outMeta);
321          } finally {
322            outMeta.close();
323          }
324
325          // second, write out data:TFile.index
326          BlockAppender outIndex =
327              writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
328          try {
329            tfileIndex.write(outIndex);
330          } finally {
331            outIndex.close();
332          }
333
334          writerBCF.close();
335        }
336      } finally {
337        IOUtils.cleanup(LOG, blkAppender, writerBCF);
338        blkAppender = null;
339        writerBCF = null;
340        state = State.CLOSED;
341      }
342    }
343
344    /**
345     * Adding a new key-value pair to the TFile. This is synonymous to
346     * append(key, 0, key.length, value, 0, value.length)
347     * 
348     * @param key
349     *          Buffer for key.
350     * @param value
351     *          Buffer for value.
352     * @throws IOException
353     */
354    public void append(byte[] key, byte[] value) throws IOException {
355      append(key, 0, key.length, value, 0, value.length);
356    }
357
358    /**
359     * Adding a new key-value pair to TFile.
360     * 
361     * @param key
362     *          buffer for key.
363     * @param koff
364     *          offset in key buffer.
365     * @param klen
366     *          length of key.
367     * @param value
368     *          buffer for value.
369     * @param voff
370     *          offset in value buffer.
371     * @param vlen
372     *          length of value.
373     * @throws IOException
374     *           Upon IO errors.
375     *           <p>
376     *           If an exception is thrown, the TFile will be in an inconsistent
377     *           state. The only legitimate call after that would be close
378     */
379    public void append(byte[] key, int koff, int klen, byte[] value, int voff,
380        int vlen) throws IOException {
381      if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
382        throw new IndexOutOfBoundsException(
383            "Bad key buffer offset-length combination.");
384      }
385
386      if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
387        throw new IndexOutOfBoundsException(
388            "Bad value buffer offset-length combination.");
389      }
390
391      try {
392        DataOutputStream dosKey = prepareAppendKey(klen);
393        try {
394          ++errorCount;
395          dosKey.write(key, koff, klen);
396          --errorCount;
397        } finally {
398          dosKey.close();
399        }
400
401        DataOutputStream dosValue = prepareAppendValue(vlen);
402        try {
403          ++errorCount;
404          dosValue.write(value, voff, vlen);
405          --errorCount;
406        } finally {
407          dosValue.close();
408        }
409      } finally {
410        state = State.READY;
411      }
412    }
413
414    /**
415     * Helper class to register key after close call on key append stream.
416     */
417    private class KeyRegister extends DataOutputStream {
418      private final int expectedLength;
419      private boolean closed = false;
420
421      public KeyRegister(int len) {
422        super(currentKeyBufferOS);
423        if (len >= 0) {
424          currentKeyBufferOS.reset(len);
425        } else {
426          currentKeyBufferOS.reset();
427        }
428        expectedLength = len;
429      }
430
431      @Override
432      public void close() throws IOException {
433        if (closed == true) {
434          return;
435        }
436
437        try {
438          ++errorCount;
439          byte[] key = currentKeyBufferOS.getBuffer();
440          int len = currentKeyBufferOS.size();
441          /**
442           * verify length.
443           */
444          if (expectedLength >= 0 && expectedLength != len) {
445            throw new IOException("Incorrect key length: expected="
446                + expectedLength + " actual=" + len);
447          }
448
449          Utils.writeVInt(blkAppender, len);
450          blkAppender.write(key, 0, len);
451          if (tfileIndex.getFirstKey() == null) {
452            tfileIndex.setFirstKey(key, 0, len);
453          }
454
455          if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
456            byte[] lastKey = lastKeyBufferOS.getBuffer();
457            int lastLen = lastKeyBufferOS.size();
458            if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
459                lastLen) < 0) {
460              throw new IOException("Keys are not added in sorted order");
461            }
462          }
463
464          BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
465          currentKeyBufferOS = lastKeyBufferOS;
466          lastKeyBufferOS = tmp;
467          --errorCount;
468        } finally {
469          closed = true;
470          state = State.END_KEY;
471        }
472      }
473    }
474
475    /**
476     * Helper class to register value after close call on value append stream.
477     */
478    private class ValueRegister extends DataOutputStream {
479      private boolean closed = false;
480
481      public ValueRegister(OutputStream os) {
482        super(os);
483      }
484
485      // Avoiding flushing call to down stream.
486      @Override
487      public void flush() {
488        // do nothing
489      }
490
491      @Override
492      public void close() throws IOException {
493        if (closed == true) {
494          return;
495        }
496
497        try {
498          ++errorCount;
499          super.close();
500          blkRecordCount++;
501          // bump up the total record count in the whole file
502          tfileMeta.incRecordCount();
503          finishDataBlock(false);
504          --errorCount;
505        } finally {
506          closed = true;
507          state = State.READY;
508        }
509      }
510    }
511
512    /**
513     * Obtain an output stream for writing a key into TFile. This may only be
514     * called when there is no active Key appending stream or value appending
515     * stream.
516     * 
517     * @param length
518     *          The expected length of the key. If length of the key is not
519     *          known, set length = -1. Otherwise, the application must write
520     *          exactly as many bytes as specified here before calling close on
521     *          the returned output stream.
522     * @return The key appending output stream.
523     * @throws IOException
524     * 
525     */
526    public DataOutputStream prepareAppendKey(int length) throws IOException {
527      if (state != State.READY) {
528        throw new IllegalStateException("Incorrect state to start a new key: "
529            + state.name());
530      }
531
532      initDataBlock();
533      DataOutputStream ret = new KeyRegister(length);
534      state = State.IN_KEY;
535      return ret;
536    }
537
538    /**
539     * Obtain an output stream for writing a value into TFile. This may only be
540     * called right after a key appending operation (the key append stream must
541     * be closed).
542     * 
543     * @param length
544     *          The expected length of the value. If length of the value is not
545     *          known, set length = -1. Otherwise, the application must write
546     *          exactly as many bytes as specified here before calling close on
547     *          the returned output stream. Advertising the value size up-front
548     *          guarantees that the value is encoded in one chunk, and avoids
549     *          intermediate chunk buffering.
550     * @throws IOException
551     * 
552     */
553    public DataOutputStream prepareAppendValue(int length) throws IOException {
554      if (state != State.END_KEY) {
555        throw new IllegalStateException(
556            "Incorrect state to start a new value: " + state.name());
557      }
558
559      DataOutputStream ret;
560
561      // unknown length
562      if (length < 0) {
563        if (valueBuffer == null) {
564          valueBuffer = new byte[getChunkBufferSize(conf)];
565        }
566        ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
567      } else {
568        ret =
569            new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
570      }
571
572      state = State.IN_VALUE;
573      return ret;
574    }
575
576    /**
577     * Obtain an output stream for creating a meta block. This function may not
578     * be called when there is a key append stream or value append stream
579     * active. No more key-value insertion is allowed after a meta data block
580     * has been added to TFile.
581     * 
582     * @param name
583     *          Name of the meta block.
584     * @param compressName
585     *          Name of the compression algorithm to be used. Must be one of the
586     *          strings returned by
587     *          {@link TFile#getSupportedCompressionAlgorithms()}.
588     * @return A DataOutputStream that can be used to write Meta Block data.
589     *         Closing the stream would signal the ending of the block.
590     * @throws IOException
591     * @throws MetaBlockAlreadyExists
592     *           the Meta Block with the same name already exists.
593     */
594    public DataOutputStream prepareMetaBlock(String name, String compressName)
595        throws IOException, MetaBlockAlreadyExists {
596      if (state != State.READY) {
597        throw new IllegalStateException(
598            "Incorrect state to start a Meta Block: " + state.name());
599      }
600
601      finishDataBlock(true);
602      DataOutputStream outputStream =
603          writerBCF.prepareMetaBlock(name, compressName);
604      return outputStream;
605    }
606
607    /**
608     * Obtain an output stream for creating a meta block. This function may not
609     * be called when there is a key append stream or value append stream
610     * active. No more key-value insertion is allowed after a meta data block
611     * has been added to TFile. Data will be compressed using the default
612     * compressor as defined in Writer's constructor.
613     * 
614     * @param name
615     *          Name of the meta block.
616     * @return A DataOutputStream that can be used to write Meta Block data.
617     *         Closing the stream would signal the ending of the block.
618     * @throws IOException
619     * @throws MetaBlockAlreadyExists
620     *           the Meta Block with the same name already exists.
621     */
622    public DataOutputStream prepareMetaBlock(String name) throws IOException,
623        MetaBlockAlreadyExists {
624      if (state != State.READY) {
625        throw new IllegalStateException(
626            "Incorrect state to start a Meta Block: " + state.name());
627      }
628
629      finishDataBlock(true);
630      return writerBCF.prepareMetaBlock(name);
631    }
632
633    /**
634     * Check if we need to start a new data block.
635     * 
636     * @throws IOException
637     */
638    private void initDataBlock() throws IOException {
639      // for each new block, get a new appender
640      if (blkAppender == null) {
641        blkAppender = writerBCF.prepareDataBlock();
642      }
643    }
644
645    /**
646     * Close the current data block if necessary.
647     * 
648     * @param bForceFinish
649     *          Force the closure regardless of the block size.
650     * @throws IOException
651     */
652    void finishDataBlock(boolean bForceFinish) throws IOException {
653      if (blkAppender == null) {
654        return;
655      }
656
657      // exceeded the size limit, do the compression and finish the block
658      if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
659        // keep tracks of the last key of each data block, no padding
660        // for now
661        TFileIndexEntry keyLast =
662            new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
663                .size(), blkRecordCount);
664        tfileIndex.addEntry(keyLast);
665        // close the appender
666        blkAppender.close();
667        blkAppender = null;
668        blkRecordCount = 0;
669      }
670    }
671  }
672
673  /**
674   * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
675   * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
676   * ) , a portion of TFile based on byte offsets (
677   * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
678   * fall in a certain key range (for sorted TFile only,
679   * {@link Reader#createScannerByKey(byte[], byte[])} or
680   * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
681   */
682  @InterfaceStability.Evolving
683  public static class Reader implements Closeable {
684    // The underlying BCFile reader.
685    final BCFile.Reader readerBCF;
686
687    // TFile index, it is loaded lazily.
688    TFileIndex tfileIndex = null;
689    final TFileMeta tfileMeta;
690    final BytesComparator comparator;
691
692    // global begin and end locations.
693    private final Location begin;
694    private final Location end;
695
696    /**
697     * Location representing a virtual position in the TFile.
698     */
699    static final class Location implements Comparable<Location>, Cloneable {
700      private int blockIndex;
701      // distance/offset from the beginning of the block
702      private long recordIndex;
703
704      Location(int blockIndex, long recordIndex) {
705        set(blockIndex, recordIndex);
706      }
707
708      void incRecordIndex() {
709        ++recordIndex;
710      }
711
712      Location(Location other) {
713        set(other);
714      }
715
716      int getBlockIndex() {
717        return blockIndex;
718      }
719
720      long getRecordIndex() {
721        return recordIndex;
722      }
723
724      void set(int blockIndex, long recordIndex) {
725        if ((blockIndex | recordIndex) < 0) {
726          throw new IllegalArgumentException(
727              "Illegal parameter for BlockLocation.");
728        }
729        this.blockIndex = blockIndex;
730        this.recordIndex = recordIndex;
731      }
732
733      void set(Location other) {
734        set(other.blockIndex, other.recordIndex);
735      }
736
737      /**
738       * @see java.lang.Comparable#compareTo(java.lang.Object)
739       */
740      @Override
741      public int compareTo(Location other) {
742        return compareTo(other.blockIndex, other.recordIndex);
743      }
744
745      int compareTo(int bid, long rid) {
746        if (this.blockIndex == bid) {
747          long ret = this.recordIndex - rid;
748          if (ret > 0) return 1;
749          if (ret < 0) return -1;
750          return 0;
751        }
752        return this.blockIndex - bid;
753      }
754
755      /**
756       * @see java.lang.Object#clone()
757       */
758      @Override
759      protected Location clone() {
760        return new Location(blockIndex, recordIndex);
761      }
762
763      /**
764       * @see java.lang.Object#hashCode()
765       */
766      @Override
767      public int hashCode() {
768        final int prime = 31;
769        int result = prime + blockIndex;
770        result = (int) (prime * result + recordIndex);
771        return result;
772      }
773
774      /**
775       * @see java.lang.Object#equals(java.lang.Object)
776       */
777      @Override
778      public boolean equals(Object obj) {
779        if (this == obj) return true;
780        if (obj == null) return false;
781        if (getClass() != obj.getClass()) return false;
782        Location other = (Location) obj;
783        if (blockIndex != other.blockIndex) return false;
784        if (recordIndex != other.recordIndex) return false;
785        return true;
786      }
787    }
788
789    /**
790     * Constructor
791     * 
792     * @param fsdis
793     *          FS input stream of the TFile.
794     * @param fileLength
795     *          The length of TFile. This is required because we have no easy
796     *          way of knowing the actual size of the input file through the
797     *          File input stream.
798     * @param conf
799     * @throws IOException
800     */
801    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
802        throws IOException {
803      readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
804
805      // first, read TFile meta
806      BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
807      try {
808        tfileMeta = new TFileMeta(brMeta);
809      } finally {
810        brMeta.close();
811      }
812
813      comparator = tfileMeta.getComparator();
814      // Set begin and end locations.
815      begin = new Location(0, 0);
816      end = new Location(readerBCF.getBlockCount(), 0);
817    }
818
819    /**
820     * Close the reader. The state of the Reader object is undefined after
821     * close. Calling close() for multiple times has no effect.
822     */
823    public void close() throws IOException {
824      readerBCF.close();
825    }
826
827    /**
828     * Get the begin location of the TFile.
829     * 
830     * @return If TFile is not empty, the location of the first key-value pair.
831     *         Otherwise, it returns end().
832     */
833    Location begin() {
834      return begin;
835    }
836
837    /**
838     * Get the end location of the TFile.
839     * 
840     * @return The location right after the last key-value pair in TFile.
841     */
842    Location end() {
843      return end;
844    }
845
846    /**
847     * Get the string representation of the comparator.
848     * 
849     * @return If the TFile is not sorted by keys, an empty string will be
850     *         returned. Otherwise, the actual comparator string that is
851     *         provided during the TFile creation time will be returned.
852     */
853    public String getComparatorName() {
854      return tfileMeta.getComparatorString();
855    }
856
857    /**
858     * Is the TFile sorted?
859     * 
860     * @return true if TFile is sorted.
861     */
862    public boolean isSorted() {
863      return tfileMeta.isSorted();
864    }
865
866    /**
867     * Get the number of key-value pair entries in TFile.
868     * 
869     * @return the number of key-value pairs in TFile
870     */
871    public long getEntryCount() {
872      return tfileMeta.getRecordCount();
873    }
874
875    /**
876     * Lazily loading the TFile index.
877     * 
878     * @throws IOException
879     */
880    synchronized void checkTFileDataIndex() throws IOException {
881      if (tfileIndex == null) {
882        BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
883        try {
884          tfileIndex =
885              new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
886                  .getComparator());
887        } finally {
888          brIndex.close();
889        }
890      }
891    }
892
893    /**
894     * Get the first key in the TFile.
895     * 
896     * @return The first key in the TFile.
897     * @throws IOException
898     */
899    public RawComparable getFirstKey() throws IOException {
900      checkTFileDataIndex();
901      return tfileIndex.getFirstKey();
902    }
903
904    /**
905     * Get the last key in the TFile.
906     * 
907     * @return The last key in the TFile.
908     * @throws IOException
909     */
910    public RawComparable getLastKey() throws IOException {
911      checkTFileDataIndex();
912      return tfileIndex.getLastKey();
913    }
914
915    /**
916     * Get a Comparator object to compare Entries. It is useful when you want
917     * stores the entries in a collection (such as PriorityQueue) and perform
918     * sorting or comparison among entries based on the keys without copying out
919     * the key.
920     * 
921     * @return An Entry Comparator..
922     */
923    public Comparator<Scanner.Entry> getEntryComparator() {
924      if (!isSorted()) {
925        throw new RuntimeException(
926            "Entries are not comparable for unsorted TFiles");
927      }
928
929      return new Comparator<Scanner.Entry>() {
930        /**
931         * Provide a customized comparator for Entries. This is useful if we
932         * have a collection of Entry objects. However, if the Entry objects
933         * come from different TFiles, users must ensure that those TFiles share
934         * the same RawComparator.
935         */
936        @Override
937        public int compare(Scanner.Entry o1, Scanner.Entry o2) {
938          return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
939              .getKeyBuffer(), 0, o2.getKeyLength());
940        }
941      };
942    }
943
944    /**
945     * Get an instance of the RawComparator that is constructed based on the
946     * string comparator representation.
947     * 
948     * @return a Comparator that can compare RawComparable's.
949     */
950    public Comparator<RawComparable> getComparator() {
951      return comparator;
952    }
953
954    /**
955     * Stream access to a meta block.``
956     * 
957     * @param name
958     *          The name of the meta block.
959     * @return The input stream.
960     * @throws IOException
961     *           on I/O error.
962     * @throws MetaBlockDoesNotExist
963     *           If the meta block with the name does not exist.
964     */
965    public DataInputStream getMetaBlock(String name) throws IOException,
966        MetaBlockDoesNotExist {
967      return readerBCF.getMetaBlock(name);
968    }
969
970    /**
971     * if greater is true then returns the beginning location of the block
972     * containing the key strictly greater than input key. if greater is false
973     * then returns the beginning location of the block greater than equal to
974     * the input key
975     * 
976     * @param key
977     *          the input key
978     * @param greater
979     *          boolean flag
980     * @return
981     * @throws IOException
982     */
983    Location getBlockContainsKey(RawComparable key, boolean greater)
984        throws IOException {
985      if (!isSorted()) {
986        throw new RuntimeException("Seeking in unsorted TFile");
987      }
988      checkTFileDataIndex();
989      int blkIndex =
990          (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
991      if (blkIndex < 0) return end;
992      return new Location(blkIndex, 0);
993    }
994
995    Location getLocationByRecordNum(long recNum) throws IOException {
996      checkTFileDataIndex();
997      return tfileIndex.getLocationByRecordNum(recNum);
998    }
999
1000    long getRecordNumByLocation(Location location) throws IOException {
1001      checkTFileDataIndex();
1002      return tfileIndex.getRecordNumByLocation(location);      
1003    }
1004    
1005    int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
1006      if (!isSorted()) {
1007        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1008      }
1009      return comparator.compare(a, o1, l1, b, o2, l2);
1010    }
1011
1012    int compareKeys(RawComparable a, RawComparable b) {
1013      if (!isSorted()) {
1014        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1015      }
1016      return comparator.compare(a, b);
1017    }
1018
1019    /**
1020     * Get the location pointing to the beginning of the first key-value pair in
1021     * a compressed block whose byte offset in the TFile is greater than or
1022     * equal to the specified offset.
1023     * 
1024     * @param offset
1025     *          the user supplied offset.
1026     * @return the location to the corresponding entry; or end() if no such
1027     *         entry exists.
1028     */
1029    Location getLocationNear(long offset) {
1030      int blockIndex = readerBCF.getBlockIndexNear(offset);
1031      if (blockIndex == -1) return end;
1032      return new Location(blockIndex, 0);
1033    }
1034
1035    /**
1036     * Get the RecordNum for the first key-value pair in a compressed block
1037     * whose byte offset in the TFile is greater than or equal to the specified
1038     * offset.
1039     * 
1040     * @param offset
1041     *          the user supplied offset.
1042     * @return the RecordNum to the corresponding entry. If no such entry
1043     *         exists, it returns the total entry count.
1044     * @throws IOException
1045     */
1046    public long getRecordNumNear(long offset) throws IOException {
1047      return getRecordNumByLocation(getLocationNear(offset));
1048    }
1049    
1050    /**
1051     * Get a sample key that is within a block whose starting offset is greater
1052     * than or equal to the specified offset.
1053     * 
1054     * @param offset
1055     *          The file offset.
1056     * @return the key that fits the requirement; or null if no such key exists
1057     *         (which could happen if the offset is close to the end of the
1058     *         TFile).
1059     * @throws IOException
1060     */
1061    public RawComparable getKeyNear(long offset) throws IOException {
1062      int blockIndex = readerBCF.getBlockIndexNear(offset);
1063      if (blockIndex == -1) return null;
1064      checkTFileDataIndex();
1065      return new ByteArray(tfileIndex.getEntry(blockIndex).key);
1066    }
1067
1068    /**
1069     * Get a scanner than can scan the whole TFile.
1070     * 
1071     * @return The scanner object. A valid Scanner is always returned even if
1072     *         the TFile is empty.
1073     * @throws IOException
1074     */
1075    public Scanner createScanner() throws IOException {
1076      return new Scanner(this, begin, end);
1077    }
1078
1079    /**
1080     * Get a scanner that covers a portion of TFile based on byte offsets.
1081     * 
1082     * @param offset
1083     *          The beginning byte offset in the TFile.
1084     * @param length
1085     *          The length of the region.
1086     * @return The actual coverage of the returned scanner tries to match the
1087     *         specified byte-region but always round up to the compression
1088     *         block boundaries. It is possible that the returned scanner
1089     *         contains zero key-value pairs even if length is positive.
1090     * @throws IOException
1091     */
1092    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
1093      return new Scanner(this, offset, offset + length);
1094    }
1095
1096    /**
1097     * Get a scanner that covers a portion of TFile based on keys.
1098     * 
1099     * @param beginKey
1100     *          Begin key of the scan (inclusive). If null, scan from the first
1101     *          key-value entry of the TFile.
1102     * @param endKey
1103     *          End key of the scan (exclusive). If null, scan up to the last
1104     *          key-value entry of the TFile.
1105     * @return The actual coverage of the returned scanner will cover all keys
1106     *         greater than or equal to the beginKey and less than the endKey.
1107     * @throws IOException
1108     * 
1109     * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
1110     */
1111    @Deprecated
1112    public Scanner createScanner(byte[] beginKey, byte[] endKey)
1113      throws IOException {
1114      return createScannerByKey(beginKey, endKey);
1115    }
1116    
1117    /**
1118     * Get a scanner that covers a portion of TFile based on keys.
1119     * 
1120     * @param beginKey
1121     *          Begin key of the scan (inclusive). If null, scan from the first
1122     *          key-value entry of the TFile.
1123     * @param endKey
1124     *          End key of the scan (exclusive). If null, scan up to the last
1125     *          key-value entry of the TFile.
1126     * @return The actual coverage of the returned scanner will cover all keys
1127     *         greater than or equal to the beginKey and less than the endKey.
1128     * @throws IOException
1129     */
1130    public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
1131        throws IOException {
1132      return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
1133          0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
1134          0, endKey.length));
1135    }
1136
1137    /**
1138     * Get a scanner that covers a specific key range.
1139     * 
1140     * @param beginKey
1141     *          Begin key of the scan (inclusive). If null, scan from the first
1142     *          key-value entry of the TFile.
1143     * @param endKey
1144     *          End key of the scan (exclusive). If null, scan up to the last
1145     *          key-value entry of the TFile.
1146     * @return The actual coverage of the returned scanner will cover all keys
1147     *         greater than or equal to the beginKey and less than the endKey.
1148     * @throws IOException
1149     * 
1150     * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
1151     *             instead.
1152     */
1153    @Deprecated
1154    public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
1155        throws IOException {
1156      return createScannerByKey(beginKey, endKey);
1157    }
1158
1159    /**
1160     * Get a scanner that covers a specific key range.
1161     * 
1162     * @param beginKey
1163     *          Begin key of the scan (inclusive). If null, scan from the first
1164     *          key-value entry of the TFile.
1165     * @param endKey
1166     *          End key of the scan (exclusive). If null, scan up to the last
1167     *          key-value entry of the TFile.
1168     * @return The actual coverage of the returned scanner will cover all keys
1169     *         greater than or equal to the beginKey and less than the endKey.
1170     * @throws IOException
1171     */
1172    public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
1173        throws IOException {
1174      if ((beginKey != null) && (endKey != null)
1175          && (compareKeys(beginKey, endKey) >= 0)) {
1176        return new Scanner(this, beginKey, beginKey);
1177      }
1178      return new Scanner(this, beginKey, endKey);
1179    }
1180
1181    /**
1182     * Create a scanner that covers a range of records.
1183     * 
1184     * @param beginRecNum
1185     *          The RecordNum for the first record (inclusive).
1186     * @param endRecNum
1187     *          The RecordNum for the last record (exclusive). To scan the whole
1188     *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
1189     * @return The TFile scanner that covers the specified range of records.
1190     * @throws IOException
1191     */
1192    public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
1193        throws IOException {
1194      if (beginRecNum < 0) beginRecNum = 0;
1195      if (endRecNum < 0 || endRecNum > getEntryCount()) {
1196        endRecNum = getEntryCount();
1197      }
1198      return new Scanner(this, getLocationByRecordNum(beginRecNum),
1199          getLocationByRecordNum(endRecNum));
1200    }
1201
1202    /**
1203     * The TFile Scanner. The Scanner has an implicit cursor, which, upon
1204     * creation, points to the first key-value pair in the scan range. If the
1205     * scan range is empty, the cursor will point to the end of the scan range.
1206     * <p>
1207     * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
1208     * location of the scanner.
1209     * <p>
1210     * Use {@link Scanner#advance()} to move the cursor to the next key-value
1211     * pair (or end if none exists). Use seekTo methods (
1212     * {@link Scanner#seekTo(byte[])} or
1213     * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
1214     * location in the covered range (including backward seeking). Use
1215     * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
1216     * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
1217     * <p>
1218     * Actual keys and values may be obtained through {@link Scanner.Entry}
1219     * object, which is obtained through {@link Scanner#entry()}.
1220     */
1221    public static class Scanner implements Closeable {
1222      // The underlying TFile reader.
1223      final Reader reader;
1224      // current block (null if reaching end)
1225      private BlockReader blkReader;
1226
1227      Location beginLocation;
1228      Location endLocation;
1229      Location currentLocation;
1230
1231      // flag to ensure value is only examined once.
1232      boolean valueChecked = false;
1233      // reusable buffer for keys.
1234      final byte[] keyBuffer;
1235      // length of key, -1 means key is invalid.
1236      int klen = -1;
1237
1238      static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
1239      BytesWritable valTransferBuffer;
1240
1241      DataInputBuffer keyDataInputStream;
1242      ChunkDecoder valueBufferInputStream;
1243      DataInputStream valueDataInputStream;
1244      // vlen == -1 if unknown.
1245      int vlen;
1246
1247      /**
1248       * Constructor
1249       * 
1250       * @param reader
1251       *          The TFile reader object.
1252       * @param offBegin
1253       *          Begin byte-offset of the scan.
1254       * @param offEnd
1255       *          End byte-offset of the scan.
1256       * @throws IOException
1257       * 
1258       *           The offsets will be rounded to the beginning of a compressed
1259       *           block whose offset is greater than or equal to the specified
1260       *           offset.
1261       */
1262      protected Scanner(Reader reader, long offBegin, long offEnd)
1263          throws IOException {
1264        this(reader, reader.getLocationNear(offBegin), reader
1265            .getLocationNear(offEnd));
1266      }
1267
1268      /**
1269       * Constructor
1270       * 
1271       * @param reader
1272       *          The TFile reader object.
1273       * @param begin
1274       *          Begin location of the scan.
1275       * @param end
1276       *          End location of the scan.
1277       * @throws IOException
1278       */
1279      Scanner(Reader reader, Location begin, Location end) throws IOException {
1280        this.reader = reader;
1281        // ensure the TFile index is loaded throughout the life of scanner.
1282        reader.checkTFileDataIndex();
1283        beginLocation = begin;
1284        endLocation = end;
1285
1286        valTransferBuffer = new BytesWritable();
1287        // TODO: remember the longest key in a TFile, and use it to replace
1288        // MAX_KEY_SIZE.
1289        keyBuffer = new byte[MAX_KEY_SIZE];
1290        keyDataInputStream = new DataInputBuffer();
1291        valueBufferInputStream = new ChunkDecoder();
1292        valueDataInputStream = new DataInputStream(valueBufferInputStream);
1293
1294        if (beginLocation.compareTo(endLocation) >= 0) {
1295          currentLocation = new Location(endLocation);
1296        } else {
1297          currentLocation = new Location(0, 0);
1298          initBlock(beginLocation.getBlockIndex());
1299          inBlockAdvance(beginLocation.getRecordIndex());
1300        }
1301      }
1302
1303      /**
1304       * Constructor
1305       * 
1306       * @param reader
1307       *          The TFile reader object.
1308       * @param beginKey
1309       *          Begin key of the scan. If null, scan from the first <K,V>
1310       *          entry of the TFile.
1311       * @param endKey
1312       *          End key of the scan. If null, scan up to the last <K, V> entry
1313       *          of the TFile.
1314       * @throws IOException
1315       */
1316      protected Scanner(Reader reader, RawComparable beginKey,
1317          RawComparable endKey) throws IOException {
1318        this(reader, (beginKey == null) ? reader.begin() : reader
1319            .getBlockContainsKey(beginKey, false), reader.end());
1320        if (beginKey != null) {
1321          inBlockAdvance(beginKey, false);
1322          beginLocation.set(currentLocation);
1323        }
1324        if (endKey != null) {
1325          seekTo(endKey, false);
1326          endLocation.set(currentLocation);
1327          seekTo(beginLocation);
1328        }
1329      }
1330
1331      /**
1332       * Move the cursor to the first entry whose key is greater than or equal
1333       * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
1334       * returned by the previous entry() call will be invalid.
1335       * 
1336       * @param key
1337       *          The input key
1338       * @return true if we find an equal key.
1339       * @throws IOException
1340       */
1341      public boolean seekTo(byte[] key) throws IOException {
1342        return seekTo(key, 0, key.length);
1343      }
1344
1345      /**
1346       * Move the cursor to the first entry whose key is greater than or equal
1347       * to the input key. The entry returned by the previous entry() call will
1348       * be invalid.
1349       * 
1350       * @param key
1351       *          The input key
1352       * @param keyOffset
1353       *          offset in the key buffer.
1354       * @param keyLen
1355       *          key buffer length.
1356       * @return true if we find an equal key; false otherwise.
1357       * @throws IOException
1358       */
1359      public boolean seekTo(byte[] key, int keyOffset, int keyLen)
1360          throws IOException {
1361        return seekTo(new ByteArray(key, keyOffset, keyLen), false);
1362      }
1363
1364      private boolean seekTo(RawComparable key, boolean beyond)
1365          throws IOException {
1366        Location l = reader.getBlockContainsKey(key, beyond);
1367        if (l.compareTo(beginLocation) < 0) {
1368          l = beginLocation;
1369        } else if (l.compareTo(endLocation) >= 0) {
1370          seekTo(endLocation);
1371          return false;
1372        }
1373
1374        // check if what we are seeking is in the later part of the current
1375        // block.
1376        if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
1377            || (compareCursorKeyTo(key) >= 0)) {
1378          // sorry, we must seek to a different location first.
1379          seekTo(l);
1380        }
1381
1382        return inBlockAdvance(key, beyond);
1383      }
1384
1385      /**
1386       * Move the cursor to the new location. The entry returned by the previous
1387       * entry() call will be invalid.
1388       * 
1389       * @param l
1390       *          new cursor location. It must fall between the begin and end
1391       *          location of the scanner.
1392       * @throws IOException
1393       */
1394      private void seekTo(Location l) throws IOException {
1395        if (l.compareTo(beginLocation) < 0) {
1396          throw new IllegalArgumentException(
1397              "Attempt to seek before the begin location.");
1398        }
1399
1400        if (l.compareTo(endLocation) > 0) {
1401          throw new IllegalArgumentException(
1402              "Attempt to seek after the end location.");
1403        }
1404
1405        if (l.compareTo(endLocation) == 0) {
1406          parkCursorAtEnd();
1407          return;
1408        }
1409
1410        if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
1411          // going to a totally different block
1412          initBlock(l.getBlockIndex());
1413        } else {
1414          if (valueChecked) {
1415            // may temporarily go beyond the last record in the block (in which
1416            // case the next if loop will always be true).
1417            inBlockAdvance(1);
1418          }
1419          if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
1420            initBlock(l.getBlockIndex());
1421          }
1422        }
1423
1424        inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
1425
1426        return;
1427      }
1428
1429      /**
1430       * Rewind to the first entry in the scanner. The entry returned by the
1431       * previous entry() call will be invalid.
1432       * 
1433       * @throws IOException
1434       */
1435      public void rewind() throws IOException {
1436        seekTo(beginLocation);
1437      }
1438
1439      /**
1440       * Seek to the end of the scanner. The entry returned by the previous
1441       * entry() call will be invalid.
1442       * 
1443       * @throws IOException
1444       */
1445      public void seekToEnd() throws IOException {
1446        parkCursorAtEnd();
1447      }
1448
1449      /**
1450       * Move the cursor to the first entry whose key is greater than or equal
1451       * to the input key. Synonymous to lowerBound(key, 0, key.length). The
1452       * entry returned by the previous entry() call will be invalid.
1453       * 
1454       * @param key
1455       *          The input key
1456       * @throws IOException
1457       */
1458      public void lowerBound(byte[] key) throws IOException {
1459        lowerBound(key, 0, key.length);
1460      }
1461
1462      /**
1463       * Move the cursor to the first entry whose key is greater than or equal
1464       * to the input key. The entry returned by the previous entry() call will
1465       * be invalid.
1466       * 
1467       * @param key
1468       *          The input key
1469       * @param keyOffset
1470       *          offset in the key buffer.
1471       * @param keyLen
1472       *          key buffer length.
1473       * @throws IOException
1474       */
1475      public void lowerBound(byte[] key, int keyOffset, int keyLen)
1476          throws IOException {
1477        seekTo(new ByteArray(key, keyOffset, keyLen), false);
1478      }
1479
1480      /**
1481       * Move the cursor to the first entry whose key is strictly greater than
1482       * the input key. Synonymous to upperBound(key, 0, key.length). The entry
1483       * returned by the previous entry() call will be invalid.
1484       * 
1485       * @param key
1486       *          The input key
1487       * @throws IOException
1488       */
1489      public void upperBound(byte[] key) throws IOException {
1490        upperBound(key, 0, key.length);
1491      }
1492
1493      /**
1494       * Move the cursor to the first entry whose key is strictly greater than
1495       * the input key. The entry returned by the previous entry() call will be
1496       * invalid.
1497       * 
1498       * @param key
1499       *          The input key
1500       * @param keyOffset
1501       *          offset in the key buffer.
1502       * @param keyLen
1503       *          key buffer length.
1504       * @throws IOException
1505       */
1506      public void upperBound(byte[] key, int keyOffset, int keyLen)
1507          throws IOException {
1508        seekTo(new ByteArray(key, keyOffset, keyLen), true);
1509      }
1510
1511      /**
1512       * Move the cursor to the next key-value pair. The entry returned by the
1513       * previous entry() call will be invalid.
1514       * 
1515       * @return true if the cursor successfully moves. False when cursor is
1516       *         already at the end location and cannot be advanced.
1517       * @throws IOException
1518       */
1519      public boolean advance() throws IOException {
1520        if (atEnd()) {
1521          return false;
1522        }
1523
1524        int curBid = currentLocation.getBlockIndex();
1525        long curRid = currentLocation.getRecordIndex();
1526        long entriesInBlock = reader.getBlockEntryCount(curBid);
1527        if (curRid + 1 >= entriesInBlock) {
1528          if (endLocation.compareTo(curBid + 1, 0) <= 0) {
1529            // last entry in TFile.
1530            parkCursorAtEnd();
1531          } else {
1532            // last entry in Block.
1533            initBlock(curBid + 1);
1534          }
1535        } else {
1536          inBlockAdvance(1);
1537        }
1538        return true;
1539      }
1540
1541      /**
1542       * Load a compressed block for reading. Expecting blockIndex is valid.
1543       * 
1544       * @throws IOException
1545       */
1546      private void initBlock(int blockIndex) throws IOException {
1547        klen = -1;
1548        if (blkReader != null) {
1549          try {
1550            blkReader.close();
1551          } finally {
1552            blkReader = null;
1553          }
1554        }
1555        blkReader = reader.getBlockReader(blockIndex);
1556        currentLocation.set(blockIndex, 0);
1557      }
1558
1559      private void parkCursorAtEnd() throws IOException {
1560        klen = -1;
1561        currentLocation.set(endLocation);
1562        if (blkReader != null) {
1563          try {
1564            blkReader.close();
1565          } finally {
1566            blkReader = null;
1567          }
1568        }
1569      }
1570
1571      /**
1572       * Close the scanner. Release all resources. The behavior of using the
1573       * scanner after calling close is not defined. The entry returned by the
1574       * previous entry() call will be invalid.
1575       */
1576      public void close() throws IOException {
1577        parkCursorAtEnd();
1578      }
1579
1580      /**
1581       * Is cursor at the end location?
1582       * 
1583       * @return true if the cursor is at the end location.
1584       */
1585      public boolean atEnd() {
1586        return (currentLocation.compareTo(endLocation) >= 0);
1587      }
1588
1589      /**
1590       * check whether we have already successfully obtained the key. It also
1591       * initializes the valueInputStream.
1592       */
1593      void checkKey() throws IOException {
1594        if (klen >= 0) return;
1595        if (atEnd()) {
1596          throw new EOFException("No key-value to read");
1597        }
1598        klen = -1;
1599        vlen = -1;
1600        valueChecked = false;
1601
1602        klen = Utils.readVInt(blkReader);
1603        blkReader.readFully(keyBuffer, 0, klen);
1604        valueBufferInputStream.reset(blkReader);
1605        if (valueBufferInputStream.isLastChunk()) {
1606          vlen = valueBufferInputStream.getRemain();
1607        }
1608      }
1609
1610      /**
1611       * Get an entry to access the key and value.
1612       * 
1613       * @return The Entry object to access the key and value.
1614       * @throws IOException
1615       */
1616      public Entry entry() throws IOException {
1617        checkKey();
1618        return new Entry();
1619      }
1620
1621      /**
1622       * Get the RecordNum corresponding to the entry pointed by the cursor.
1623       * @return The RecordNum corresponding to the entry pointed by the cursor.
1624       * @throws IOException
1625       */
1626      public long getRecordNum() throws IOException {
1627        return reader.getRecordNumByLocation(currentLocation);
1628      }
1629      
1630      /**
1631       * Internal API. Comparing the key at cursor to user-specified key.
1632       * 
1633       * @param other
1634       *          user-specified key.
1635       * @return negative if key at cursor is smaller than user key; 0 if equal;
1636       *         and positive if key at cursor greater than user key.
1637       * @throws IOException
1638       */
1639      int compareCursorKeyTo(RawComparable other) throws IOException {
1640        checkKey();
1641        return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
1642            .offset(), other.size());
1643      }
1644
1645      /**
1646       * Entry to a &lt;Key, Value&gt; pair.
1647       */
1648      public class Entry implements Comparable<RawComparable> {
1649        /**
1650         * Get the length of the key.
1651         * 
1652         * @return the length of the key.
1653         */
1654        public int getKeyLength() {
1655          return klen;
1656        }
1657
1658        byte[] getKeyBuffer() {
1659          return keyBuffer;
1660        }
1661
1662        /**
1663         * Copy the key and value in one shot into BytesWritables. This is
1664         * equivalent to getKey(key); getValue(value);
1665         * 
1666         * @param key
1667         *          BytesWritable to hold key.
1668         * @param value
1669         *          BytesWritable to hold value
1670         * @throws IOException
1671         */
1672        public void get(BytesWritable key, BytesWritable value)
1673            throws IOException {
1674          getKey(key);
1675          getValue(value);
1676        }
1677
1678        /**
1679         * Copy the key into BytesWritable. The input BytesWritable will be
1680         * automatically resized to the actual key size.
1681         * 
1682         * @param key
1683         *          BytesWritable to hold the key.
1684         * @throws IOException
1685         */
1686        public int getKey(BytesWritable key) throws IOException {
1687          key.setSize(getKeyLength());
1688          getKey(key.getBytes());
1689          return key.getLength();
1690        }
1691
1692        /**
1693         * Copy the value into BytesWritable. The input BytesWritable will be
1694         * automatically resized to the actual value size. The implementation
1695         * directly uses the buffer inside BytesWritable for storing the value.
1696         * The call does not require the value length to be known.
1697         * 
1698         * @param value
1699         * @throws IOException
1700         */
1701        public long getValue(BytesWritable value) throws IOException {
1702          DataInputStream dis = getValueStream();
1703          int size = 0;
1704          try {
1705            int remain;
1706            while ((remain = valueBufferInputStream.getRemain()) > 0) {
1707              value.setSize(size + remain);
1708              dis.readFully(value.getBytes(), size, remain);
1709              size += remain;
1710            }
1711            return value.getLength();
1712          } finally {
1713            dis.close();
1714          }
1715        }
1716
1717        /**
1718         * Writing the key to the output stream. This method avoids copying key
1719         * buffer from Scanner into user buffer, then writing to the output
1720         * stream.
1721         * 
1722         * @param out
1723         *          The output stream
1724         * @return the length of the key.
1725         * @throws IOException
1726         */
1727        public int writeKey(OutputStream out) throws IOException {
1728          out.write(keyBuffer, 0, klen);
1729          return klen;
1730        }
1731
1732        /**
1733         * Writing the value to the output stream. This method avoids copying
1734         * value data from Scanner into user buffer, then writing to the output
1735         * stream. It does not require the value length to be known.
1736         * 
1737         * @param out
1738         *          The output stream
1739         * @return the length of the value
1740         * @throws IOException
1741         */
1742        public long writeValue(OutputStream out) throws IOException {
1743          DataInputStream dis = getValueStream();
1744          long size = 0;
1745          try {
1746            int chunkSize;
1747            while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
1748              chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
1749              valTransferBuffer.setSize(chunkSize);
1750              dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
1751              out.write(valTransferBuffer.getBytes(), 0, chunkSize);
1752              size += chunkSize;
1753            }
1754            return size;
1755          } finally {
1756            dis.close();
1757          }
1758        }
1759
1760        /**
1761         * Copy the key into user supplied buffer.
1762         * 
1763         * @param buf
1764         *          The buffer supplied by user. The length of the buffer must
1765         *          not be shorter than the key length.
1766         * @return The length of the key.
1767         * 
1768         * @throws IOException
1769         */
1770        public int getKey(byte[] buf) throws IOException {
1771          return getKey(buf, 0);
1772        }
1773
1774        /**
1775         * Copy the key into user supplied buffer.
1776         * 
1777         * @param buf
1778         *          The buffer supplied by user.
1779         * @param offset
1780         *          The starting offset of the user buffer where we should copy
1781         *          the key into. Requiring the key-length + offset no greater
1782         *          than the buffer length.
1783         * @return The length of the key.
1784         * @throws IOException
1785         */
1786        public int getKey(byte[] buf, int offset) throws IOException {
1787          if ((offset | (buf.length - offset - klen)) < 0) {
1788            throw new IndexOutOfBoundsException(
1789                "Bufer not enough to store the key");
1790          }
1791          System.arraycopy(keyBuffer, 0, buf, offset, klen);
1792          return klen;
1793        }
1794
1795        /**
1796         * Streaming access to the key. Useful for desrializing the key into
1797         * user objects.
1798         * 
1799         * @return The input stream.
1800         */
1801        public DataInputStream getKeyStream() {
1802          keyDataInputStream.reset(keyBuffer, klen);
1803          return keyDataInputStream;
1804        }
1805
1806        /**
1807         * Get the length of the value. isValueLengthKnown() must be tested
1808         * true.
1809         * 
1810         * @return the length of the value.
1811         */
1812        public int getValueLength() {
1813          if (vlen >= 0) {
1814            return vlen;
1815          }
1816
1817          throw new RuntimeException("Value length unknown.");
1818        }
1819
1820        /**
1821         * Copy value into user-supplied buffer. User supplied buffer must be
1822         * large enough to hold the whole value. The value part of the key-value
1823         * pair pointed by the current cursor is not cached and can only be
1824         * examined once. Calling any of the following functions more than once
1825         * without moving the cursor will result in exception:
1826         * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1827         * {@link #getValueStream}.
1828         * 
1829         * @return the length of the value. Does not require
1830         *         isValueLengthKnown() to be true.
1831         * @throws IOException
1832         * 
1833         */
1834        public int getValue(byte[] buf) throws IOException {
1835          return getValue(buf, 0);
1836        }
1837
1838        /**
1839         * Copy value into user-supplied buffer. User supplied buffer must be
1840         * large enough to hold the whole value (starting from the offset). The
1841         * value part of the key-value pair pointed by the current cursor is not
1842         * cached and can only be examined once. Calling any of the following
1843         * functions more than once without moving the cursor will result in
1844         * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1845         * {@link #getValueStream}.
1846         * 
1847         * @return the length of the value. Does not require
1848         *         isValueLengthKnown() to be true.
1849         * @throws IOException
1850         */
1851        public int getValue(byte[] buf, int offset) throws IOException {
1852          DataInputStream dis = getValueStream();
1853          try {
1854            if (isValueLengthKnown()) {
1855              if ((offset | (buf.length - offset - vlen)) < 0) {
1856                throw new IndexOutOfBoundsException(
1857                    "Buffer too small to hold value");
1858              }
1859              dis.readFully(buf, offset, vlen);
1860              return vlen;
1861            }
1862
1863            int nextOffset = offset;
1864            while (nextOffset < buf.length) {
1865              int n = dis.read(buf, nextOffset, buf.length - nextOffset);
1866              if (n < 0) {
1867                break;
1868              }
1869              nextOffset += n;
1870            }
1871            if (dis.read() >= 0) {
1872              // attempt to read one more byte to determine whether we reached
1873              // the
1874              // end or not.
1875              throw new IndexOutOfBoundsException(
1876                  "Buffer too small to hold value");
1877            }
1878            return nextOffset - offset;
1879          } finally {
1880            dis.close();
1881          }
1882        }
1883
1884        /**
1885         * Stream access to value. The value part of the key-value pair pointed
1886         * by the current cursor is not cached and can only be examined once.
1887         * Calling any of the following functions more than once without moving
1888         * the cursor will result in exception: {@link #getValue(byte[])},
1889         * {@link #getValue(byte[], int)}, {@link #getValueStream}.
1890         * 
1891         * @return The input stream for reading the value.
1892         * @throws IOException
1893         */
1894        public DataInputStream getValueStream() throws IOException {
1895          if (valueChecked == true) {
1896            throw new IllegalStateException(
1897                "Attempt to examine value multiple times.");
1898          }
1899          valueChecked = true;
1900          return valueDataInputStream;
1901        }
1902
1903        /**
1904         * Check whether it is safe to call getValueLength().
1905         * 
1906         * @return true if value length is known before hand. Values less than
1907         *         the chunk size will always have their lengths known before
1908         *         hand. Values that are written out as a whole (with advertised
1909         *         length up-front) will always have their lengths known in
1910         *         read.
1911         */
1912        public boolean isValueLengthKnown() {
1913          return (vlen >= 0);
1914        }
1915
1916        /**
1917         * Compare the entry key to another key. Synonymous to compareTo(key, 0,
1918         * key.length).
1919         * 
1920         * @param buf
1921         *          The key buffer.
1922         * @return comparison result between the entry key with the input key.
1923         */
1924        public int compareTo(byte[] buf) {
1925          return compareTo(buf, 0, buf.length);
1926        }
1927
1928        /**
1929         * Compare the entry key to another key. Synonymous to compareTo(new
1930         * ByteArray(buf, offset, length)
1931         * 
1932         * @param buf
1933         *          The key buffer
1934         * @param offset
1935         *          offset into the key buffer.
1936         * @param length
1937         *          the length of the key.
1938         * @return comparison result between the entry key with the input key.
1939         */
1940        public int compareTo(byte[] buf, int offset, int length) {
1941          return compareTo(new ByteArray(buf, offset, length));
1942        }
1943
1944        /**
1945         * Compare an entry with a RawComparable object. This is useful when
1946         * Entries are stored in a collection, and we want to compare a user
1947         * supplied key.
1948         */
1949        @Override
1950        public int compareTo(RawComparable key) {
1951          return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
1952              key.offset(), key.size());
1953        }
1954
1955        /**
1956         * Compare whether this and other points to the same key value.
1957         */
1958        @Override
1959        public boolean equals(Object other) {
1960          if (this == other) return true;
1961          if (!(other instanceof Entry)) return false;
1962          return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
1963        }
1964
1965        @Override
1966        public int hashCode() {
1967          return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
1968        }
1969      }
1970
1971      /**
1972       * Advance cursor by n positions within the block.
1973       * 
1974       * @param n
1975       *          Number of key-value pairs to skip in block.
1976       * @throws IOException
1977       */
1978      private void inBlockAdvance(long n) throws IOException {
1979        for (long i = 0; i < n; ++i) {
1980          checkKey();
1981          if (!valueBufferInputStream.isClosed()) {
1982            valueBufferInputStream.close();
1983          }
1984          klen = -1;
1985          currentLocation.incRecordIndex();
1986        }
1987      }
1988
1989      /**
1990       * Advance cursor in block until we find a key that is greater than or
1991       * equal to the input key.
1992       * 
1993       * @param key
1994       *          Key to compare.
1995       * @param greater
1996       *          advance until we find a key greater than the input key.
1997       * @return true if we find a equal key.
1998       * @throws IOException
1999       */
2000      private boolean inBlockAdvance(RawComparable key, boolean greater)
2001          throws IOException {
2002        int curBid = currentLocation.getBlockIndex();
2003        long entryInBlock = reader.getBlockEntryCount(curBid);
2004        if (curBid == endLocation.getBlockIndex()) {
2005          entryInBlock = endLocation.getRecordIndex();
2006        }
2007
2008        while (currentLocation.getRecordIndex() < entryInBlock) {
2009          int cmp = compareCursorKeyTo(key);
2010          if (cmp > 0) return false;
2011          if (cmp == 0 && !greater) return true;
2012          if (!valueBufferInputStream.isClosed()) {
2013            valueBufferInputStream.close();
2014          }
2015          klen = -1;
2016          currentLocation.incRecordIndex();
2017        }
2018
2019        throw new RuntimeException("Cannot find matching key in block.");
2020      }
2021    }
2022
2023    long getBlockEntryCount(int curBid) {
2024      return tfileIndex.getEntry(curBid).entries();
2025    }
2026
2027    BlockReader getBlockReader(int blockIndex) throws IOException {
2028      return readerBCF.getDataBlock(blockIndex);
2029    }
2030  }
2031
2032  /**
2033   * Data structure representing "TFile.meta" meta block.
2034   */
2035  static final class TFileMeta {
2036    final static String BLOCK_NAME = "TFile.meta";
2037    final Version version;
2038    private long recordCount;
2039    private final String strComparator;
2040    private final BytesComparator comparator;
2041
2042    // ctor for writes
2043    public TFileMeta(String comparator) {
2044      // set fileVersion to API version when we create it.
2045      version = TFile.API_VERSION;
2046      recordCount = 0;
2047      strComparator = (comparator == null) ? "" : comparator;
2048      this.comparator = makeComparator(strComparator);
2049    }
2050
2051    // ctor for reads
2052    public TFileMeta(DataInput in) throws IOException {
2053      version = new Version(in);
2054      if (!version.compatibleWith(TFile.API_VERSION)) {
2055        throw new RuntimeException("Incompatible TFile fileVersion.");
2056      }
2057      recordCount = Utils.readVLong(in);
2058      strComparator = Utils.readString(in);
2059      comparator = makeComparator(strComparator);
2060    }
2061
2062    @SuppressWarnings("unchecked")
2063    static BytesComparator makeComparator(String comparator) {
2064      if (comparator.length() == 0) {
2065        // unsorted keys
2066        return null;
2067      }
2068      if (comparator.equals(COMPARATOR_MEMCMP)) {
2069        // default comparator
2070        return new BytesComparator(new MemcmpRawComparator());
2071      } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
2072        String compClassName =
2073            comparator.substring(COMPARATOR_JCLASS.length()).trim();
2074        try {
2075          Class compClass = Class.forName(compClassName);
2076          // use its default ctor to create an instance
2077          return new BytesComparator((RawComparator<Object>) compClass
2078              .newInstance());
2079        } catch (Exception e) {
2080          throw new IllegalArgumentException(
2081              "Failed to instantiate comparator: " + comparator + "("
2082                  + e.toString() + ")");
2083        }
2084      } else {
2085        throw new IllegalArgumentException("Unsupported comparator: "
2086            + comparator);
2087      }
2088    }
2089
2090    public void write(DataOutput out) throws IOException {
2091      TFile.API_VERSION.write(out);
2092      Utils.writeVLong(out, recordCount);
2093      Utils.writeString(out, strComparator);
2094    }
2095
2096    public long getRecordCount() {
2097      return recordCount;
2098    }
2099
2100    public void incRecordCount() {
2101      ++recordCount;
2102    }
2103
2104    public boolean isSorted() {
2105      return !strComparator.equals("");
2106    }
2107
2108    public String getComparatorString() {
2109      return strComparator;
2110    }
2111
2112    public BytesComparator getComparator() {
2113      return comparator;
2114    }
2115
2116    public Version getVersion() {
2117      return version;
2118    }
2119  } // END: class MetaTFileMeta
2120
2121  /**
2122   * Data structure representing "TFile.index" meta block.
2123   */
2124  static class TFileIndex {
2125    final static String BLOCK_NAME = "TFile.index";
2126    private ByteArray firstKey;
2127    private final ArrayList<TFileIndexEntry> index;
2128    private final ArrayList<Long> recordNumIndex;
2129    private final BytesComparator comparator;
2130    private long sum = 0;
2131    
2132    /**
2133     * For reading from file.
2134     * 
2135     * @throws IOException
2136     */
2137    public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
2138        throws IOException {
2139      index = new ArrayList<TFileIndexEntry>(entryCount);
2140      recordNumIndex = new ArrayList<Long>(entryCount);
2141      int size = Utils.readVInt(in); // size for the first key entry.
2142      if (size > 0) {
2143        byte[] buffer = new byte[size];
2144        in.readFully(buffer);
2145        DataInputStream firstKeyInputStream =
2146            new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
2147
2148        int firstKeyLength = Utils.readVInt(firstKeyInputStream);
2149        firstKey = new ByteArray(new byte[firstKeyLength]);
2150        firstKeyInputStream.readFully(firstKey.buffer());
2151
2152        for (int i = 0; i < entryCount; i++) {
2153          size = Utils.readVInt(in);
2154          if (buffer.length < size) {
2155            buffer = new byte[size];
2156          }
2157          in.readFully(buffer, 0, size);
2158          TFileIndexEntry idx =
2159              new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
2160                  buffer, 0, size)));
2161          index.add(idx);
2162          sum += idx.entries();
2163          recordNumIndex.add(sum);
2164        }
2165      } else {
2166        if (entryCount != 0) {
2167          throw new RuntimeException("Internal error");
2168        }
2169      }
2170      this.comparator = comparator;
2171    }
2172
2173    /**
2174     * @param key
2175     *          input key.
2176     * @return the ID of the first block that contains key >= input key. Or -1
2177     *         if no such block exists.
2178     */
2179    public int lowerBound(RawComparable key) {
2180      if (comparator == null) {
2181        throw new RuntimeException("Cannot search in unsorted TFile");
2182      }
2183
2184      if (firstKey == null) {
2185        return -1; // not found
2186      }
2187
2188      int ret = Utils.lowerBound(index, key, comparator);
2189      if (ret == index.size()) {
2190        return -1;
2191      }
2192      return ret;
2193    }
2194
2195    /**
2196     * @param key
2197     *          input key.
2198     * @return the ID of the first block that contains key > input key. Or -1
2199     *         if no such block exists.
2200     */
2201    public int upperBound(RawComparable key) {
2202      if (comparator == null) {
2203        throw new RuntimeException("Cannot search in unsorted TFile");
2204      }
2205
2206      if (firstKey == null) {
2207        return -1; // not found
2208      }
2209
2210      int ret = Utils.upperBound(index, key, comparator);
2211      if (ret == index.size()) {
2212        return -1;
2213      }
2214      return ret;
2215    }
2216
2217    /**
2218     * For writing to file.
2219     */
2220    public TFileIndex(BytesComparator comparator) {
2221      index = new ArrayList<TFileIndexEntry>();
2222      recordNumIndex = new ArrayList<Long>();
2223      this.comparator = comparator;
2224    }
2225
2226    public RawComparable getFirstKey() {
2227      return firstKey;
2228    }
2229    
2230    public Reader.Location getLocationByRecordNum(long recNum) {
2231      int idx = Utils.upperBound(recordNumIndex, recNum);
2232      long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
2233      return new Reader.Location(idx, recNum-lastRecNum);
2234    }
2235
2236    public long getRecordNumByLocation(Reader.Location location) {
2237      int blkIndex = location.getBlockIndex();
2238      long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
2239      return lastRecNum + location.getRecordIndex();
2240    }
2241    
2242    public void setFirstKey(byte[] key, int offset, int length) {
2243      firstKey = new ByteArray(new byte[length]);
2244      System.arraycopy(key, offset, firstKey.buffer(), 0, length);
2245    }
2246
2247    public RawComparable getLastKey() {
2248      if (index.size() == 0) {
2249        return null;
2250      }
2251      return new ByteArray(index.get(index.size() - 1).buffer());
2252    }
2253
2254    public void addEntry(TFileIndexEntry keyEntry) {
2255      index.add(keyEntry);
2256      sum += keyEntry.entries();
2257      recordNumIndex.add(sum);
2258    }
2259
2260    public TFileIndexEntry getEntry(int bid) {
2261      return index.get(bid);
2262    }
2263
2264    public void write(DataOutput out) throws IOException {
2265      if (firstKey == null) {
2266        Utils.writeVInt(out, 0);
2267        return;
2268      }
2269
2270      DataOutputBuffer dob = new DataOutputBuffer();
2271      Utils.writeVInt(dob, firstKey.size());
2272      dob.write(firstKey.buffer());
2273      Utils.writeVInt(out, dob.size());
2274      out.write(dob.getData(), 0, dob.getLength());
2275
2276      for (TFileIndexEntry entry : index) {
2277        dob.reset();
2278        entry.write(dob);
2279        Utils.writeVInt(out, dob.getLength());
2280        out.write(dob.getData(), 0, dob.getLength());
2281      }
2282    }
2283  }
2284
2285  /**
2286   * TFile Data Index entry. We should try to make the memory footprint of each
2287   * index entry as small as possible.
2288   */
2289  static final class TFileIndexEntry implements RawComparable {
2290    final byte[] key;
2291    // count of <key, value> entries in the block.
2292    final long kvEntries;
2293
2294    public TFileIndexEntry(DataInput in) throws IOException {
2295      int len = Utils.readVInt(in);
2296      key = new byte[len];
2297      in.readFully(key, 0, len);
2298      kvEntries = Utils.readVLong(in);
2299    }
2300
2301    // default entry, without any padding
2302    public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
2303      key = new byte[len];
2304      System.arraycopy(newkey, offset, key, 0, len);
2305      this.kvEntries = entries;
2306    }
2307
2308    @Override
2309    public byte[] buffer() {
2310      return key;
2311    }
2312
2313    @Override
2314    public int offset() {
2315      return 0;
2316    }
2317
2318    @Override
2319    public int size() {
2320      return key.length;
2321    }
2322
2323    long entries() {
2324      return kvEntries;
2325    }
2326
2327    public void write(DataOutput out) throws IOException {
2328      Utils.writeVInt(out, key.length);
2329      out.write(key, 0, key.length);
2330      Utils.writeVLong(out, kvEntries);
2331    }
2332  }
2333
2334  /**
2335   * Dumping the TFile information.
2336   * 
2337   * @param args
2338   *          A list of TFile paths.
2339   */
2340  public static void main(String[] args) {
2341    System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION
2342        .toString(), BCFile.API_VERSION.toString());
2343    if (args.length == 0) {
2344      System.out
2345          .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
2346      System.exit(0);
2347    }
2348    Configuration conf = new Configuration();
2349
2350    for (String file : args) {
2351      System.out.println("===" + file + "===");
2352      try {
2353        TFileDumper.dumpInfo(file, System.out, conf);
2354      } catch (IOException e) {
2355        e.printStackTrace(System.err);
2356      }
2357    }
2358  }
2359}