001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with this
004 * work for additional information regarding copyright ownership. The ASF
005 * licenses this file to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 * 
009 * http://www.apache.org/licenses/LICENSE-2.0
010 * 
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
014 * License for the specific language governing permissions and limitations under
015 * the License.
016 */
017
018package org.apache.hadoop.io.file.tfile;
019
020import java.io.ByteArrayInputStream;
021import java.io.Closeable;
022import java.io.DataInput;
023import java.io.DataInputStream;
024import java.io.DataOutput;
025import java.io.DataOutputStream;
026import java.io.EOFException;
027import java.io.IOException;
028import java.io.OutputStream;
029import java.util.ArrayList;
030import java.util.Comparator;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FSDataInputStream;
038import org.apache.hadoop.fs.FSDataOutputStream;
039import org.apache.hadoop.io.BoundedByteArrayOutputStream;
040import org.apache.hadoop.io.BytesWritable;
041import org.apache.hadoop.io.DataInputBuffer;
042import org.apache.hadoop.io.DataOutputBuffer;
043import org.apache.hadoop.io.IOUtils;
044import org.apache.hadoop.io.RawComparator;
045import org.apache.hadoop.io.WritableComparator;
046import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader;
047import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender;
048import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder;
049import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder;
050import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator;
051import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator;
052import org.apache.hadoop.io.file.tfile.Utils.Version;
053import org.apache.hadoop.io.serializer.JavaSerializationComparator;
054
055/**
056 * A TFile is a container of key-value pairs. Both keys and values are type-less
057 * bytes. Keys are restricted to 64KB, value length is not restricted
058 * (practically limited to the available disk storage). TFile further provides
059 * the following features:
060 * <ul>
061 * <li>Block Compression.
062 * <li>Named meta data blocks.
063 * <li>Sorted or unsorted keys.
064 * <li>Seek by key or by file offset.
065 * </ul>
066 * The memory footprint of a TFile includes the following:
067 * <ul>
068 * <li>Some constant overhead of reading or writing a compressed block.
069 * <ul>
070 * <li>Each compressed block requires one compression/decompression codec for
071 * I/O.
072 * <li>Temporary space to buffer the key.
073 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are
074 * chunk encoded, so that we buffer at most one chunk of user data. By default,
075 * the chunk buffer is 1MB. Reading chunked value does not require additional
076 * memory.
077 * </ul>
078 * <li>TFile index, which is proportional to the total number of Data Blocks.
079 * The total amount of memory needed to hold the index can be estimated as
080 * (56+AvgKeySize)*NumBlocks.
081 * <li>MetaBlock index, which is proportional to the total number of Meta
082 * Blocks.The total amount of memory needed to hold the index for Meta Blocks
083 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock.
084 * </ul>
085 * <p>
086 * The behavior of TFile can be customized by the following variables through
087 * Configuration:
088 * <ul>
089 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default
090 * to 1MB. Values of the length less than the chunk size is guaranteed to have
091 * known value length in read time (See
092 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}).
093 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for
094 * FSDataOutputStream. Integer (in bytes). Default to 256KB.
095 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for
096 * FSDataInputStream. Integer (in bytes). Default to 256KB.
097 * </ul>
098 * <p>
099 * Suggestions on performance optimization.
100 * <ul>
101 * <li>Minimum block size. We recommend a setting of minimum block size between
102 * 256KB to 1MB for general usage. Larger block size is preferred if files are
103 * primarily for sequential access. However, it would lead to inefficient random
104 * access (because there are more data to decompress). Smaller blocks are good
105 * for random access, but require more memory to hold the block index, and may
106 * be slower to create (because we must flush the compressor stream at the
107 * conclusion of each data block, which leads to an FS I/O flush). Further, due
108 * to the internal caching in Compression codec, the smallest possible block
109 * size would be around 20KB-30KB.
110 * <li>The current implementation does not offer true multi-threading for
111 * reading. The implementation uses FSDataInputStream seek()+read(), which is
112 * shown to be much faster than positioned-read call in single thread mode.
113 * However, it also means that if multiple threads attempt to access the same
114 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out
115 * sequentially even if they access different DFS blocks.
116 * <li>Compression codec. Use "none" if the data is not very compressable (by
117 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
118 * as the starting point for experimenting. "gz" overs slightly better
119 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
120 * decompress, comparing to "lzo".
121 * <li>File system buffering, if the underlying FSDataInputStream and
122 * FSDataOutputStream is already adequately buffered; or if applications
123 * reads/writes keys and values in large buffers, we can reduce the sizes of
124 * input/output buffering in TFile layer by setting the configuration parameters
125 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size".
126 * </ul>
127 * 
128 * Some design rationale behind TFile can be found at <a
129 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>.
130 */
131@InterfaceAudience.Public
132@InterfaceStability.Evolving
133public class TFile {
134  static final Log LOG = LogFactory.getLog(TFile.class);
135
136  private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size";
137  private static final String FS_INPUT_BUF_SIZE_ATTR =
138      "tfile.fs.input.buffer.size";
139  private static final String FS_OUTPUT_BUF_SIZE_ATTR =
140      "tfile.fs.output.buffer.size";
141
142  static int getChunkBufferSize(Configuration conf) {
143    int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024);
144    return (ret > 0) ? ret : 1024 * 1024;
145  }
146
147  static int getFSInputBufferSize(Configuration conf) {
148    return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
149  }
150
151  static int getFSOutputBufferSize(Configuration conf) {
152    return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024);
153  }
154
155  private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB
156  static final Version API_VERSION = new Version((short) 1, (short) 0);
157
158  /** compression: gzip */
159  public static final String COMPRESSION_GZ = "gz";
160  /** compression: lzo */
161  public static final String COMPRESSION_LZO = "lzo";
162  /** compression: none */
163  public static final String COMPRESSION_NONE = "none";
164  /** comparator: memcmp */
165  public static final String COMPARATOR_MEMCMP = "memcmp";
166  /** comparator prefix: java class */
167  public static final String COMPARATOR_JCLASS = "jclass:";
168
169  /**
170   * Make a raw comparator from a string name.
171   * 
172   * @param name
173   *          Comparator name
174   * @return A RawComparable comparator.
175   */
176  static public Comparator<RawComparable> makeComparator(String name) {
177    return TFileMeta.makeComparator(name);
178  }
179
180  // Prevent the instantiation of TFiles
181  private TFile() {
182    // nothing
183  }
184
185  /**
186   * Get names of supported compression algorithms. The names are acceptable by
187   * TFile.Writer.
188   * 
189   * @return Array of strings, each represents a supported compression
190   *         algorithm. Currently, the following compression algorithms are
191   *         supported.
192   *         <ul>
193   *         <li>"none" - No compression.
194   *         <li>"lzo" - LZO compression.
195   *         <li>"gz" - GZIP compression.
196   *         </ul>
197   */
198  public static String[] getSupportedCompressionAlgorithms() {
199    return Compression.getSupportedAlgorithms();
200  }
201
202  /**
203   * TFile Writer.
204   */
205  @InterfaceStability.Evolving
206  public static class Writer implements Closeable {
207    // minimum compressed size for a block.
208    private final int sizeMinBlock;
209
210    // Meta blocks.
211    final TFileIndex tfileIndex;
212    final TFileMeta tfileMeta;
213
214    // reference to the underlying BCFile.
215    private BCFile.Writer writerBCF;
216
217    // current data block appender.
218    BlockAppender blkAppender;
219    long blkRecordCount;
220
221    // buffers for caching the key.
222    BoundedByteArrayOutputStream currentKeyBufferOS;
223    BoundedByteArrayOutputStream lastKeyBufferOS;
224
225    // buffer used by chunk codec
226    private byte[] valueBuffer;
227
228    /**
229     * Writer states. The state always transits in circles: READY -> IN_KEY ->
230     * END_KEY -> IN_VALUE -> READY.
231     */
232    private enum State {
233      READY, // Ready to start a new key-value pair insertion.
234      IN_KEY, // In the middle of key insertion.
235      END_KEY, // Key insertion complete, ready to insert value.
236      IN_VALUE, // In value insertion.
237      // ERROR, // Error encountered, cannot continue.
238      CLOSED, // TFile already closed.
239    };
240
241    // current state of Writer.
242    State state = State.READY;
243    Configuration conf;
244    long errorCount = 0;
245
246    /**
247     * Constructor
248     * 
249     * @param fsdos
250     *          output stream for writing. Must be at position 0.
251     * @param minBlockSize
252     *          Minimum compressed block size in bytes. A compression block will
253     *          not be closed until it reaches this size except for the last
254     *          block.
255     * @param compressName
256     *          Name of the compression algorithm. Must be one of the strings
257     *          returned by {@link TFile#getSupportedCompressionAlgorithms()}.
258     * @param comparator
259     *          Leave comparator as null or empty string if TFile is not sorted.
260     *          Otherwise, provide the string name for the comparison algorithm
261     *          for keys. Two kinds of comparators are supported.
262     *          <ul>
263     *          <li>Algorithmic comparator: binary comparators that is language
264     *          independent. Currently, only "memcmp" is supported.
265     *          <li>Language-specific comparator: binary comparators that can
266     *          only be constructed in specific language. For Java, the syntax
267     *          is "jclass:", followed by the class name of the RawComparator.
268     *          Currently, we only support RawComparators that can be
269     *          constructed through the default constructor (with no
270     *          parameters). Parameterized RawComparators such as
271     *          {@link WritableComparator} or
272     *          {@link JavaSerializationComparator} may not be directly used.
273     *          One should write a wrapper class that inherits from such classes
274     *          and use its default constructor to perform proper
275     *          initialization.
276     *          </ul>
277     * @param conf
278     *          The configuration object.
279     * @throws IOException
280     */
281    public Writer(FSDataOutputStream fsdos, int minBlockSize,
282        String compressName, String comparator, Configuration conf)
283        throws IOException {
284      sizeMinBlock = minBlockSize;
285      tfileMeta = new TFileMeta(comparator);
286      tfileIndex = new TFileIndex(tfileMeta.getComparator());
287
288      writerBCF = new BCFile.Writer(fsdos, compressName, conf);
289      currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
290      lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE);
291      this.conf = conf;
292    }
293
294    /**
295     * Close the Writer. Resources will be released regardless of the exceptions
296     * being thrown. Future close calls will have no effect.
297     * 
298     * The underlying FSDataOutputStream is not closed.
299     */
300    @Override
301    public void close() throws IOException {
302      if ((state == State.CLOSED)) {
303        return;
304      }
305      try {
306        // First try the normal finish.
307        // Terminate upon the first Exception.
308        if (errorCount == 0) {
309          if (state != State.READY) {
310            throw new IllegalStateException(
311                "Cannot close TFile in the middle of key-value insertion.");
312          }
313
314          finishDataBlock(true);
315
316          // first, write out data:TFile.meta
317          BlockAppender outMeta =
318              writerBCF
319                  .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE);
320          try {
321            tfileMeta.write(outMeta);
322          } finally {
323            outMeta.close();
324          }
325
326          // second, write out data:TFile.index
327          BlockAppender outIndex =
328              writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME);
329          try {
330            tfileIndex.write(outIndex);
331          } finally {
332            outIndex.close();
333          }
334
335          writerBCF.close();
336        }
337      } finally {
338        IOUtils.cleanup(LOG, blkAppender, writerBCF);
339        blkAppender = null;
340        writerBCF = null;
341        state = State.CLOSED;
342      }
343    }
344
345    /**
346     * Adding a new key-value pair to the TFile. This is synonymous to
347     * append(key, 0, key.length, value, 0, value.length)
348     * 
349     * @param key
350     *          Buffer for key.
351     * @param value
352     *          Buffer for value.
353     * @throws IOException
354     */
355    public void append(byte[] key, byte[] value) throws IOException {
356      append(key, 0, key.length, value, 0, value.length);
357    }
358
359    /**
360     * Adding a new key-value pair to TFile.
361     * 
362     * @param key
363     *          buffer for key.
364     * @param koff
365     *          offset in key buffer.
366     * @param klen
367     *          length of key.
368     * @param value
369     *          buffer for value.
370     * @param voff
371     *          offset in value buffer.
372     * @param vlen
373     *          length of value.
374     * @throws IOException
375     *           Upon IO errors.
376     *           <p>
377     *           If an exception is thrown, the TFile will be in an inconsistent
378     *           state. The only legitimate call after that would be close
379     */
380    public void append(byte[] key, int koff, int klen, byte[] value, int voff,
381        int vlen) throws IOException {
382      if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) {
383        throw new IndexOutOfBoundsException(
384            "Bad key buffer offset-length combination.");
385      }
386
387      if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) {
388        throw new IndexOutOfBoundsException(
389            "Bad value buffer offset-length combination.");
390      }
391
392      try {
393        DataOutputStream dosKey = prepareAppendKey(klen);
394        try {
395          ++errorCount;
396          dosKey.write(key, koff, klen);
397          --errorCount;
398        } finally {
399          dosKey.close();
400        }
401
402        DataOutputStream dosValue = prepareAppendValue(vlen);
403        try {
404          ++errorCount;
405          dosValue.write(value, voff, vlen);
406          --errorCount;
407        } finally {
408          dosValue.close();
409        }
410      } finally {
411        state = State.READY;
412      }
413    }
414
415    /**
416     * Helper class to register key after close call on key append stream.
417     */
418    private class KeyRegister extends DataOutputStream {
419      private final int expectedLength;
420      private boolean closed = false;
421
422      public KeyRegister(int len) {
423        super(currentKeyBufferOS);
424        if (len >= 0) {
425          currentKeyBufferOS.reset(len);
426        } else {
427          currentKeyBufferOS.reset();
428        }
429        expectedLength = len;
430      }
431
432      @Override
433      public void close() throws IOException {
434        if (closed == true) {
435          return;
436        }
437
438        try {
439          ++errorCount;
440          byte[] key = currentKeyBufferOS.getBuffer();
441          int len = currentKeyBufferOS.size();
442          /**
443           * verify length.
444           */
445          if (expectedLength >= 0 && expectedLength != len) {
446            throw new IOException("Incorrect key length: expected="
447                + expectedLength + " actual=" + len);
448          }
449
450          Utils.writeVInt(blkAppender, len);
451          blkAppender.write(key, 0, len);
452          if (tfileIndex.getFirstKey() == null) {
453            tfileIndex.setFirstKey(key, 0, len);
454          }
455
456          if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) {
457            byte[] lastKey = lastKeyBufferOS.getBuffer();
458            int lastLen = lastKeyBufferOS.size();
459            if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0,
460                lastLen) < 0) {
461              throw new IOException("Keys are not added in sorted order");
462            }
463          }
464
465          BoundedByteArrayOutputStream tmp = currentKeyBufferOS;
466          currentKeyBufferOS = lastKeyBufferOS;
467          lastKeyBufferOS = tmp;
468          --errorCount;
469        } finally {
470          closed = true;
471          state = State.END_KEY;
472        }
473      }
474    }
475
476    /**
477     * Helper class to register value after close call on value append stream.
478     */
479    private class ValueRegister extends DataOutputStream {
480      private boolean closed = false;
481
482      public ValueRegister(OutputStream os) {
483        super(os);
484      }
485
486      // Avoiding flushing call to down stream.
487      @Override
488      public void flush() {
489        // do nothing
490      }
491
492      @Override
493      public void close() throws IOException {
494        if (closed == true) {
495          return;
496        }
497
498        try {
499          ++errorCount;
500          super.close();
501          blkRecordCount++;
502          // bump up the total record count in the whole file
503          tfileMeta.incRecordCount();
504          finishDataBlock(false);
505          --errorCount;
506        } finally {
507          closed = true;
508          state = State.READY;
509        }
510      }
511    }
512
513    /**
514     * Obtain an output stream for writing a key into TFile. This may only be
515     * called when there is no active Key appending stream or value appending
516     * stream.
517     * 
518     * @param length
519     *          The expected length of the key. If length of the key is not
520     *          known, set length = -1. Otherwise, the application must write
521     *          exactly as many bytes as specified here before calling close on
522     *          the returned output stream.
523     * @return The key appending output stream.
524     * @throws IOException
525     * 
526     */
527    public DataOutputStream prepareAppendKey(int length) throws IOException {
528      if (state != State.READY) {
529        throw new IllegalStateException("Incorrect state to start a new key: "
530            + state.name());
531      }
532
533      initDataBlock();
534      DataOutputStream ret = new KeyRegister(length);
535      state = State.IN_KEY;
536      return ret;
537    }
538
539    /**
540     * Obtain an output stream for writing a value into TFile. This may only be
541     * called right after a key appending operation (the key append stream must
542     * be closed).
543     * 
544     * @param length
545     *          The expected length of the value. If length of the value is not
546     *          known, set length = -1. Otherwise, the application must write
547     *          exactly as many bytes as specified here before calling close on
548     *          the returned output stream. Advertising the value size up-front
549     *          guarantees that the value is encoded in one chunk, and avoids
550     *          intermediate chunk buffering.
551     * @throws IOException
552     * 
553     */
554    public DataOutputStream prepareAppendValue(int length) throws IOException {
555      if (state != State.END_KEY) {
556        throw new IllegalStateException(
557            "Incorrect state to start a new value: " + state.name());
558      }
559
560      DataOutputStream ret;
561
562      // unknown length
563      if (length < 0) {
564        if (valueBuffer == null) {
565          valueBuffer = new byte[getChunkBufferSize(conf)];
566        }
567        ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer));
568      } else {
569        ret =
570            new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length));
571      }
572
573      state = State.IN_VALUE;
574      return ret;
575    }
576
577    /**
578     * Obtain an output stream for creating a meta block. This function may not
579     * be called when there is a key append stream or value append stream
580     * active. No more key-value insertion is allowed after a meta data block
581     * has been added to TFile.
582     * 
583     * @param name
584     *          Name of the meta block.
585     * @param compressName
586     *          Name of the compression algorithm to be used. Must be one of the
587     *          strings returned by
588     *          {@link TFile#getSupportedCompressionAlgorithms()}.
589     * @return A DataOutputStream that can be used to write Meta Block data.
590     *         Closing the stream would signal the ending of the block.
591     * @throws IOException
592     * @throws MetaBlockAlreadyExists
593     *           the Meta Block with the same name already exists.
594     */
595    public DataOutputStream prepareMetaBlock(String name, String compressName)
596        throws IOException, MetaBlockAlreadyExists {
597      if (state != State.READY) {
598        throw new IllegalStateException(
599            "Incorrect state to start a Meta Block: " + state.name());
600      }
601
602      finishDataBlock(true);
603      DataOutputStream outputStream =
604          writerBCF.prepareMetaBlock(name, compressName);
605      return outputStream;
606    }
607
608    /**
609     * Obtain an output stream for creating a meta block. This function may not
610     * be called when there is a key append stream or value append stream
611     * active. No more key-value insertion is allowed after a meta data block
612     * has been added to TFile. Data will be compressed using the default
613     * compressor as defined in Writer's constructor.
614     * 
615     * @param name
616     *          Name of the meta block.
617     * @return A DataOutputStream that can be used to write Meta Block data.
618     *         Closing the stream would signal the ending of the block.
619     * @throws IOException
620     * @throws MetaBlockAlreadyExists
621     *           the Meta Block with the same name already exists.
622     */
623    public DataOutputStream prepareMetaBlock(String name) throws IOException,
624        MetaBlockAlreadyExists {
625      if (state != State.READY) {
626        throw new IllegalStateException(
627            "Incorrect state to start a Meta Block: " + state.name());
628      }
629
630      finishDataBlock(true);
631      return writerBCF.prepareMetaBlock(name);
632    }
633
634    /**
635     * Check if we need to start a new data block.
636     * 
637     * @throws IOException
638     */
639    private void initDataBlock() throws IOException {
640      // for each new block, get a new appender
641      if (blkAppender == null) {
642        blkAppender = writerBCF.prepareDataBlock();
643      }
644    }
645
646    /**
647     * Close the current data block if necessary.
648     * 
649     * @param bForceFinish
650     *          Force the closure regardless of the block size.
651     * @throws IOException
652     */
653    void finishDataBlock(boolean bForceFinish) throws IOException {
654      if (blkAppender == null) {
655        return;
656      }
657
658      // exceeded the size limit, do the compression and finish the block
659      if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) {
660        // keep tracks of the last key of each data block, no padding
661        // for now
662        TFileIndexEntry keyLast =
663            new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS
664                .size(), blkRecordCount);
665        tfileIndex.addEntry(keyLast);
666        // close the appender
667        blkAppender.close();
668        blkAppender = null;
669        blkRecordCount = 0;
670      }
671    }
672  }
673
674  /**
675   * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
676   * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
677   * ) , a portion of TFile based on byte offsets (
678   * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
679   * fall in a certain key range (for sorted TFile only,
680   * {@link Reader#createScannerByKey(byte[], byte[])} or
681   * {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
682   */
683  @InterfaceStability.Evolving
684  public static class Reader implements Closeable {
685    // The underlying BCFile reader.
686    final BCFile.Reader readerBCF;
687
688    // TFile index, it is loaded lazily.
689    TFileIndex tfileIndex = null;
690    final TFileMeta tfileMeta;
691    final BytesComparator comparator;
692
693    // global begin and end locations.
694    private final Location begin;
695    private final Location end;
696
697    /**
698     * Location representing a virtual position in the TFile.
699     */
700    static final class Location implements Comparable<Location>, Cloneable {
701      private int blockIndex;
702      // distance/offset from the beginning of the block
703      private long recordIndex;
704
705      Location(int blockIndex, long recordIndex) {
706        set(blockIndex, recordIndex);
707      }
708
709      void incRecordIndex() {
710        ++recordIndex;
711      }
712
713      Location(Location other) {
714        set(other);
715      }
716
717      int getBlockIndex() {
718        return blockIndex;
719      }
720
721      long getRecordIndex() {
722        return recordIndex;
723      }
724
725      void set(int blockIndex, long recordIndex) {
726        if ((blockIndex | recordIndex) < 0) {
727          throw new IllegalArgumentException(
728              "Illegal parameter for BlockLocation.");
729        }
730        this.blockIndex = blockIndex;
731        this.recordIndex = recordIndex;
732      }
733
734      void set(Location other) {
735        set(other.blockIndex, other.recordIndex);
736      }
737
738      /**
739       * @see java.lang.Comparable#compareTo(java.lang.Object)
740       */
741      @Override
742      public int compareTo(Location other) {
743        return compareTo(other.blockIndex, other.recordIndex);
744      }
745
746      int compareTo(int bid, long rid) {
747        if (this.blockIndex == bid) {
748          long ret = this.recordIndex - rid;
749          if (ret > 0) return 1;
750          if (ret < 0) return -1;
751          return 0;
752        }
753        return this.blockIndex - bid;
754      }
755
756      /**
757       * @see java.lang.Object#clone()
758       */
759      @Override
760      protected Location clone() {
761        return new Location(blockIndex, recordIndex);
762      }
763
764      /**
765       * @see java.lang.Object#hashCode()
766       */
767      @Override
768      public int hashCode() {
769        final int prime = 31;
770        int result = prime + blockIndex;
771        result = (int) (prime * result + recordIndex);
772        return result;
773      }
774
775      /**
776       * @see java.lang.Object#equals(java.lang.Object)
777       */
778      @Override
779      public boolean equals(Object obj) {
780        if (this == obj) return true;
781        if (obj == null) return false;
782        if (getClass() != obj.getClass()) return false;
783        Location other = (Location) obj;
784        if (blockIndex != other.blockIndex) return false;
785        if (recordIndex != other.recordIndex) return false;
786        return true;
787      }
788    }
789
790    /**
791     * Constructor
792     * 
793     * @param fsdis
794     *          FS input stream of the TFile.
795     * @param fileLength
796     *          The length of TFile. This is required because we have no easy
797     *          way of knowing the actual size of the input file through the
798     *          File input stream.
799     * @param conf
800     * @throws IOException
801     */
802    public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf)
803        throws IOException {
804      readerBCF = new BCFile.Reader(fsdis, fileLength, conf);
805
806      // first, read TFile meta
807      BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME);
808      try {
809        tfileMeta = new TFileMeta(brMeta);
810      } finally {
811        brMeta.close();
812      }
813
814      comparator = tfileMeta.getComparator();
815      // Set begin and end locations.
816      begin = new Location(0, 0);
817      end = new Location(readerBCF.getBlockCount(), 0);
818    }
819
820    /**
821     * Close the reader. The state of the Reader object is undefined after
822     * close. Calling close() for multiple times has no effect.
823     */
824    @Override
825    public void close() throws IOException {
826      readerBCF.close();
827    }
828
829    /**
830     * Get the begin location of the TFile.
831     * 
832     * @return If TFile is not empty, the location of the first key-value pair.
833     *         Otherwise, it returns end().
834     */
835    Location begin() {
836      return begin;
837    }
838
839    /**
840     * Get the end location of the TFile.
841     * 
842     * @return The location right after the last key-value pair in TFile.
843     */
844    Location end() {
845      return end;
846    }
847
848    /**
849     * Get the string representation of the comparator.
850     * 
851     * @return If the TFile is not sorted by keys, an empty string will be
852     *         returned. Otherwise, the actual comparator string that is
853     *         provided during the TFile creation time will be returned.
854     */
855    public String getComparatorName() {
856      return tfileMeta.getComparatorString();
857    }
858
859    /**
860     * Is the TFile sorted?
861     * 
862     * @return true if TFile is sorted.
863     */
864    public boolean isSorted() {
865      return tfileMeta.isSorted();
866    }
867
868    /**
869     * Get the number of key-value pair entries in TFile.
870     * 
871     * @return the number of key-value pairs in TFile
872     */
873    public long getEntryCount() {
874      return tfileMeta.getRecordCount();
875    }
876
877    /**
878     * Lazily loading the TFile index.
879     * 
880     * @throws IOException
881     */
882    synchronized void checkTFileDataIndex() throws IOException {
883      if (tfileIndex == null) {
884        BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME);
885        try {
886          tfileIndex =
887              new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta
888                  .getComparator());
889        } finally {
890          brIndex.close();
891        }
892      }
893    }
894
895    /**
896     * Get the first key in the TFile.
897     * 
898     * @return The first key in the TFile.
899     * @throws IOException
900     */
901    public RawComparable getFirstKey() throws IOException {
902      checkTFileDataIndex();
903      return tfileIndex.getFirstKey();
904    }
905
906    /**
907     * Get the last key in the TFile.
908     * 
909     * @return The last key in the TFile.
910     * @throws IOException
911     */
912    public RawComparable getLastKey() throws IOException {
913      checkTFileDataIndex();
914      return tfileIndex.getLastKey();
915    }
916
917    /**
918     * Get a Comparator object to compare Entries. It is useful when you want
919     * stores the entries in a collection (such as PriorityQueue) and perform
920     * sorting or comparison among entries based on the keys without copying out
921     * the key.
922     * 
923     * @return An Entry Comparator..
924     */
925    public Comparator<Scanner.Entry> getEntryComparator() {
926      if (!isSorted()) {
927        throw new RuntimeException(
928            "Entries are not comparable for unsorted TFiles");
929      }
930
931      return new Comparator<Scanner.Entry>() {
932        /**
933         * Provide a customized comparator for Entries. This is useful if we
934         * have a collection of Entry objects. However, if the Entry objects
935         * come from different TFiles, users must ensure that those TFiles share
936         * the same RawComparator.
937         */
938        @Override
939        public int compare(Scanner.Entry o1, Scanner.Entry o2) {
940          return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2
941              .getKeyBuffer(), 0, o2.getKeyLength());
942        }
943      };
944    }
945
946    /**
947     * Get an instance of the RawComparator that is constructed based on the
948     * string comparator representation.
949     * 
950     * @return a Comparator that can compare RawComparable's.
951     */
952    public Comparator<RawComparable> getComparator() {
953      return comparator;
954    }
955
956    /**
957     * Stream access to a meta block.``
958     * 
959     * @param name
960     *          The name of the meta block.
961     * @return The input stream.
962     * @throws IOException
963     *           on I/O error.
964     * @throws MetaBlockDoesNotExist
965     *           If the meta block with the name does not exist.
966     */
967    public DataInputStream getMetaBlock(String name) throws IOException,
968        MetaBlockDoesNotExist {
969      return readerBCF.getMetaBlock(name);
970    }
971
972    /**
973     * if greater is true then returns the beginning location of the block
974     * containing the key strictly greater than input key. if greater is false
975     * then returns the beginning location of the block greater than equal to
976     * the input key
977     * 
978     * @param key
979     *          the input key
980     * @param greater
981     *          boolean flag
982     * @return
983     * @throws IOException
984     */
985    Location getBlockContainsKey(RawComparable key, boolean greater)
986        throws IOException {
987      if (!isSorted()) {
988        throw new RuntimeException("Seeking in unsorted TFile");
989      }
990      checkTFileDataIndex();
991      int blkIndex =
992          (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key);
993      if (blkIndex < 0) return end;
994      return new Location(blkIndex, 0);
995    }
996
997    Location getLocationByRecordNum(long recNum) throws IOException {
998      checkTFileDataIndex();
999      return tfileIndex.getLocationByRecordNum(recNum);
1000    }
1001
1002    long getRecordNumByLocation(Location location) throws IOException {
1003      checkTFileDataIndex();
1004      return tfileIndex.getRecordNumByLocation(location);      
1005    }
1006    
1007    int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
1008      if (!isSorted()) {
1009        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1010      }
1011      return comparator.compare(a, o1, l1, b, o2, l2);
1012    }
1013
1014    int compareKeys(RawComparable a, RawComparable b) {
1015      if (!isSorted()) {
1016        throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
1017      }
1018      return comparator.compare(a, b);
1019    }
1020
1021    /**
1022     * Get the location pointing to the beginning of the first key-value pair in
1023     * a compressed block whose byte offset in the TFile is greater than or
1024     * equal to the specified offset.
1025     * 
1026     * @param offset
1027     *          the user supplied offset.
1028     * @return the location to the corresponding entry; or end() if no such
1029     *         entry exists.
1030     */
1031    Location getLocationNear(long offset) {
1032      int blockIndex = readerBCF.getBlockIndexNear(offset);
1033      if (blockIndex == -1) return end;
1034      return new Location(blockIndex, 0);
1035    }
1036
1037    /**
1038     * Get the RecordNum for the first key-value pair in a compressed block
1039     * whose byte offset in the TFile is greater than or equal to the specified
1040     * offset.
1041     * 
1042     * @param offset
1043     *          the user supplied offset.
1044     * @return the RecordNum to the corresponding entry. If no such entry
1045     *         exists, it returns the total entry count.
1046     * @throws IOException
1047     */
1048    public long getRecordNumNear(long offset) throws IOException {
1049      return getRecordNumByLocation(getLocationNear(offset));
1050    }
1051    
1052    /**
1053     * Get a sample key that is within a block whose starting offset is greater
1054     * than or equal to the specified offset.
1055     * 
1056     * @param offset
1057     *          The file offset.
1058     * @return the key that fits the requirement; or null if no such key exists
1059     *         (which could happen if the offset is close to the end of the
1060     *         TFile).
1061     * @throws IOException
1062     */
1063    public RawComparable getKeyNear(long offset) throws IOException {
1064      int blockIndex = readerBCF.getBlockIndexNear(offset);
1065      if (blockIndex == -1) return null;
1066      checkTFileDataIndex();
1067      return new ByteArray(tfileIndex.getEntry(blockIndex).key);
1068    }
1069
1070    /**
1071     * Get a scanner than can scan the whole TFile.
1072     * 
1073     * @return The scanner object. A valid Scanner is always returned even if
1074     *         the TFile is empty.
1075     * @throws IOException
1076     */
1077    public Scanner createScanner() throws IOException {
1078      return new Scanner(this, begin, end);
1079    }
1080
1081    /**
1082     * Get a scanner that covers a portion of TFile based on byte offsets.
1083     * 
1084     * @param offset
1085     *          The beginning byte offset in the TFile.
1086     * @param length
1087     *          The length of the region.
1088     * @return The actual coverage of the returned scanner tries to match the
1089     *         specified byte-region but always round up to the compression
1090     *         block boundaries. It is possible that the returned scanner
1091     *         contains zero key-value pairs even if length is positive.
1092     * @throws IOException
1093     */
1094    public Scanner createScannerByByteRange(long offset, long length) throws IOException {
1095      return new Scanner(this, offset, offset + length);
1096    }
1097
1098    /**
1099     * Get a scanner that covers a portion of TFile based on keys.
1100     * 
1101     * @param beginKey
1102     *          Begin key of the scan (inclusive). If null, scan from the first
1103     *          key-value entry of the TFile.
1104     * @param endKey
1105     *          End key of the scan (exclusive). If null, scan up to the last
1106     *          key-value entry of the TFile.
1107     * @return The actual coverage of the returned scanner will cover all keys
1108     *         greater than or equal to the beginKey and less than the endKey.
1109     * @throws IOException
1110     * 
1111     * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
1112     */
1113    @Deprecated
1114    public Scanner createScanner(byte[] beginKey, byte[] endKey)
1115      throws IOException {
1116      return createScannerByKey(beginKey, endKey);
1117    }
1118    
1119    /**
1120     * Get a scanner that covers a portion of TFile based on keys.
1121     * 
1122     * @param beginKey
1123     *          Begin key of the scan (inclusive). If null, scan from the first
1124     *          key-value entry of the TFile.
1125     * @param endKey
1126     *          End key of the scan (exclusive). If null, scan up to the last
1127     *          key-value entry of the TFile.
1128     * @return The actual coverage of the returned scanner will cover all keys
1129     *         greater than or equal to the beginKey and less than the endKey.
1130     * @throws IOException
1131     */
1132    public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
1133        throws IOException {
1134      return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
1135          0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
1136          0, endKey.length));
1137    }
1138
1139    /**
1140     * Get a scanner that covers a specific key range.
1141     * 
1142     * @param beginKey
1143     *          Begin key of the scan (inclusive). If null, scan from the first
1144     *          key-value entry of the TFile.
1145     * @param endKey
1146     *          End key of the scan (exclusive). If null, scan up to the last
1147     *          key-value entry of the TFile.
1148     * @return The actual coverage of the returned scanner will cover all keys
1149     *         greater than or equal to the beginKey and less than the endKey.
1150     * @throws IOException
1151     * 
1152     * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
1153     *             instead.
1154     */
1155    @Deprecated
1156    public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
1157        throws IOException {
1158      return createScannerByKey(beginKey, endKey);
1159    }
1160
1161    /**
1162     * Get a scanner that covers a specific key range.
1163     * 
1164     * @param beginKey
1165     *          Begin key of the scan (inclusive). If null, scan from the first
1166     *          key-value entry of the TFile.
1167     * @param endKey
1168     *          End key of the scan (exclusive). If null, scan up to the last
1169     *          key-value entry of the TFile.
1170     * @return The actual coverage of the returned scanner will cover all keys
1171     *         greater than or equal to the beginKey and less than the endKey.
1172     * @throws IOException
1173     */
1174    public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
1175        throws IOException {
1176      if ((beginKey != null) && (endKey != null)
1177          && (compareKeys(beginKey, endKey) >= 0)) {
1178        return new Scanner(this, beginKey, beginKey);
1179      }
1180      return new Scanner(this, beginKey, endKey);
1181    }
1182
1183    /**
1184     * Create a scanner that covers a range of records.
1185     * 
1186     * @param beginRecNum
1187     *          The RecordNum for the first record (inclusive).
1188     * @param endRecNum
1189     *          The RecordNum for the last record (exclusive). To scan the whole
1190     *          file, either specify endRecNum==-1 or endRecNum==getEntryCount().
1191     * @return The TFile scanner that covers the specified range of records.
1192     * @throws IOException
1193     */
1194    public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
1195        throws IOException {
1196      if (beginRecNum < 0) beginRecNum = 0;
1197      if (endRecNum < 0 || endRecNum > getEntryCount()) {
1198        endRecNum = getEntryCount();
1199      }
1200      return new Scanner(this, getLocationByRecordNum(beginRecNum),
1201          getLocationByRecordNum(endRecNum));
1202    }
1203
1204    /**
1205     * The TFile Scanner. The Scanner has an implicit cursor, which, upon
1206     * creation, points to the first key-value pair in the scan range. If the
1207     * scan range is empty, the cursor will point to the end of the scan range.
1208     * <p>
1209     * Use {@link Scanner#atEnd()} to test whether the cursor is at the end
1210     * location of the scanner.
1211     * <p>
1212     * Use {@link Scanner#advance()} to move the cursor to the next key-value
1213     * pair (or end if none exists). Use seekTo methods (
1214     * {@link Scanner#seekTo(byte[])} or
1215     * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary
1216     * location in the covered range (including backward seeking). Use
1217     * {@link Scanner#rewind()} to seek back to the beginning of the scanner.
1218     * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner.
1219     * <p>
1220     * Actual keys and values may be obtained through {@link Scanner.Entry}
1221     * object, which is obtained through {@link Scanner#entry()}.
1222     */
1223    public static class Scanner implements Closeable {
1224      // The underlying TFile reader.
1225      final Reader reader;
1226      // current block (null if reaching end)
1227      private BlockReader blkReader;
1228
1229      Location beginLocation;
1230      Location endLocation;
1231      Location currentLocation;
1232
1233      // flag to ensure value is only examined once.
1234      boolean valueChecked = false;
1235      // reusable buffer for keys.
1236      final byte[] keyBuffer;
1237      // length of key, -1 means key is invalid.
1238      int klen = -1;
1239
1240      static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024;
1241      BytesWritable valTransferBuffer;
1242
1243      DataInputBuffer keyDataInputStream;
1244      ChunkDecoder valueBufferInputStream;
1245      DataInputStream valueDataInputStream;
1246      // vlen == -1 if unknown.
1247      int vlen;
1248
1249      /**
1250       * Constructor
1251       * 
1252       * @param reader
1253       *          The TFile reader object.
1254       * @param offBegin
1255       *          Begin byte-offset of the scan.
1256       * @param offEnd
1257       *          End byte-offset of the scan.
1258       * @throws IOException
1259       * 
1260       *           The offsets will be rounded to the beginning of a compressed
1261       *           block whose offset is greater than or equal to the specified
1262       *           offset.
1263       */
1264      protected Scanner(Reader reader, long offBegin, long offEnd)
1265          throws IOException {
1266        this(reader, reader.getLocationNear(offBegin), reader
1267            .getLocationNear(offEnd));
1268      }
1269
1270      /**
1271       * Constructor
1272       * 
1273       * @param reader
1274       *          The TFile reader object.
1275       * @param begin
1276       *          Begin location of the scan.
1277       * @param end
1278       *          End location of the scan.
1279       * @throws IOException
1280       */
1281      Scanner(Reader reader, Location begin, Location end) throws IOException {
1282        this.reader = reader;
1283        // ensure the TFile index is loaded throughout the life of scanner.
1284        reader.checkTFileDataIndex();
1285        beginLocation = begin;
1286        endLocation = end;
1287
1288        valTransferBuffer = new BytesWritable();
1289        // TODO: remember the longest key in a TFile, and use it to replace
1290        // MAX_KEY_SIZE.
1291        keyBuffer = new byte[MAX_KEY_SIZE];
1292        keyDataInputStream = new DataInputBuffer();
1293        valueBufferInputStream = new ChunkDecoder();
1294        valueDataInputStream = new DataInputStream(valueBufferInputStream);
1295
1296        if (beginLocation.compareTo(endLocation) >= 0) {
1297          currentLocation = new Location(endLocation);
1298        } else {
1299          currentLocation = new Location(0, 0);
1300          initBlock(beginLocation.getBlockIndex());
1301          inBlockAdvance(beginLocation.getRecordIndex());
1302        }
1303      }
1304
1305      /**
1306       * Constructor
1307       * 
1308       * @param reader
1309       *          The TFile reader object.
1310       * @param beginKey
1311       *          Begin key of the scan. If null, scan from the first <K,V>
1312       *          entry of the TFile.
1313       * @param endKey
1314       *          End key of the scan. If null, scan up to the last <K, V> entry
1315       *          of the TFile.
1316       * @throws IOException
1317       */
1318      protected Scanner(Reader reader, RawComparable beginKey,
1319          RawComparable endKey) throws IOException {
1320        this(reader, (beginKey == null) ? reader.begin() : reader
1321            .getBlockContainsKey(beginKey, false), reader.end());
1322        if (beginKey != null) {
1323          inBlockAdvance(beginKey, false);
1324          beginLocation.set(currentLocation);
1325        }
1326        if (endKey != null) {
1327          seekTo(endKey, false);
1328          endLocation.set(currentLocation);
1329          seekTo(beginLocation);
1330        }
1331      }
1332
1333      /**
1334       * Move the cursor to the first entry whose key is greater than or equal
1335       * to the input key. Synonymous to seekTo(key, 0, key.length). The entry
1336       * returned by the previous entry() call will be invalid.
1337       * 
1338       * @param key
1339       *          The input key
1340       * @return true if we find an equal key.
1341       * @throws IOException
1342       */
1343      public boolean seekTo(byte[] key) throws IOException {
1344        return seekTo(key, 0, key.length);
1345      }
1346
1347      /**
1348       * Move the cursor to the first entry whose key is greater than or equal
1349       * to the input key. The entry returned by the previous entry() call will
1350       * be invalid.
1351       * 
1352       * @param key
1353       *          The input key
1354       * @param keyOffset
1355       *          offset in the key buffer.
1356       * @param keyLen
1357       *          key buffer length.
1358       * @return true if we find an equal key; false otherwise.
1359       * @throws IOException
1360       */
1361      public boolean seekTo(byte[] key, int keyOffset, int keyLen)
1362          throws IOException {
1363        return seekTo(new ByteArray(key, keyOffset, keyLen), false);
1364      }
1365
1366      private boolean seekTo(RawComparable key, boolean beyond)
1367          throws IOException {
1368        Location l = reader.getBlockContainsKey(key, beyond);
1369        if (l.compareTo(beginLocation) < 0) {
1370          l = beginLocation;
1371        } else if (l.compareTo(endLocation) >= 0) {
1372          seekTo(endLocation);
1373          return false;
1374        }
1375
1376        // check if what we are seeking is in the later part of the current
1377        // block.
1378        if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex())
1379            || (compareCursorKeyTo(key) >= 0)) {
1380          // sorry, we must seek to a different location first.
1381          seekTo(l);
1382        }
1383
1384        return inBlockAdvance(key, beyond);
1385      }
1386
1387      /**
1388       * Move the cursor to the new location. The entry returned by the previous
1389       * entry() call will be invalid.
1390       * 
1391       * @param l
1392       *          new cursor location. It must fall between the begin and end
1393       *          location of the scanner.
1394       * @throws IOException
1395       */
1396      private void seekTo(Location l) throws IOException {
1397        if (l.compareTo(beginLocation) < 0) {
1398          throw new IllegalArgumentException(
1399              "Attempt to seek before the begin location.");
1400        }
1401
1402        if (l.compareTo(endLocation) > 0) {
1403          throw new IllegalArgumentException(
1404              "Attempt to seek after the end location.");
1405        }
1406
1407        if (l.compareTo(endLocation) == 0) {
1408          parkCursorAtEnd();
1409          return;
1410        }
1411
1412        if (l.getBlockIndex() != currentLocation.getBlockIndex()) {
1413          // going to a totally different block
1414          initBlock(l.getBlockIndex());
1415        } else {
1416          if (valueChecked) {
1417            // may temporarily go beyond the last record in the block (in which
1418            // case the next if loop will always be true).
1419            inBlockAdvance(1);
1420          }
1421          if (l.getRecordIndex() < currentLocation.getRecordIndex()) {
1422            initBlock(l.getBlockIndex());
1423          }
1424        }
1425
1426        inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex());
1427
1428        return;
1429      }
1430
1431      /**
1432       * Rewind to the first entry in the scanner. The entry returned by the
1433       * previous entry() call will be invalid.
1434       * 
1435       * @throws IOException
1436       */
1437      public void rewind() throws IOException {
1438        seekTo(beginLocation);
1439      }
1440
1441      /**
1442       * Seek to the end of the scanner. The entry returned by the previous
1443       * entry() call will be invalid.
1444       * 
1445       * @throws IOException
1446       */
1447      public void seekToEnd() throws IOException {
1448        parkCursorAtEnd();
1449      }
1450
1451      /**
1452       * Move the cursor to the first entry whose key is greater than or equal
1453       * to the input key. Synonymous to lowerBound(key, 0, key.length). The
1454       * entry returned by the previous entry() call will be invalid.
1455       * 
1456       * @param key
1457       *          The input key
1458       * @throws IOException
1459       */
1460      public void lowerBound(byte[] key) throws IOException {
1461        lowerBound(key, 0, key.length);
1462      }
1463
1464      /**
1465       * Move the cursor to the first entry whose key is greater than or equal
1466       * to the input key. The entry returned by the previous entry() call will
1467       * be invalid.
1468       * 
1469       * @param key
1470       *          The input key
1471       * @param keyOffset
1472       *          offset in the key buffer.
1473       * @param keyLen
1474       *          key buffer length.
1475       * @throws IOException
1476       */
1477      public void lowerBound(byte[] key, int keyOffset, int keyLen)
1478          throws IOException {
1479        seekTo(new ByteArray(key, keyOffset, keyLen), false);
1480      }
1481
1482      /**
1483       * Move the cursor to the first entry whose key is strictly greater than
1484       * the input key. Synonymous to upperBound(key, 0, key.length). The entry
1485       * returned by the previous entry() call will be invalid.
1486       * 
1487       * @param key
1488       *          The input key
1489       * @throws IOException
1490       */
1491      public void upperBound(byte[] key) throws IOException {
1492        upperBound(key, 0, key.length);
1493      }
1494
1495      /**
1496       * Move the cursor to the first entry whose key is strictly greater than
1497       * the input key. The entry returned by the previous entry() call will be
1498       * invalid.
1499       * 
1500       * @param key
1501       *          The input key
1502       * @param keyOffset
1503       *          offset in the key buffer.
1504       * @param keyLen
1505       *          key buffer length.
1506       * @throws IOException
1507       */
1508      public void upperBound(byte[] key, int keyOffset, int keyLen)
1509          throws IOException {
1510        seekTo(new ByteArray(key, keyOffset, keyLen), true);
1511      }
1512
1513      /**
1514       * Move the cursor to the next key-value pair. The entry returned by the
1515       * previous entry() call will be invalid.
1516       * 
1517       * @return true if the cursor successfully moves. False when cursor is
1518       *         already at the end location and cannot be advanced.
1519       * @throws IOException
1520       */
1521      public boolean advance() throws IOException {
1522        if (atEnd()) {
1523          return false;
1524        }
1525
1526        int curBid = currentLocation.getBlockIndex();
1527        long curRid = currentLocation.getRecordIndex();
1528        long entriesInBlock = reader.getBlockEntryCount(curBid);
1529        if (curRid + 1 >= entriesInBlock) {
1530          if (endLocation.compareTo(curBid + 1, 0) <= 0) {
1531            // last entry in TFile.
1532            parkCursorAtEnd();
1533          } else {
1534            // last entry in Block.
1535            initBlock(curBid + 1);
1536          }
1537        } else {
1538          inBlockAdvance(1);
1539        }
1540        return true;
1541      }
1542
1543      /**
1544       * Load a compressed block for reading. Expecting blockIndex is valid.
1545       * 
1546       * @throws IOException
1547       */
1548      private void initBlock(int blockIndex) throws IOException {
1549        klen = -1;
1550        if (blkReader != null) {
1551          try {
1552            blkReader.close();
1553          } finally {
1554            blkReader = null;
1555          }
1556        }
1557        blkReader = reader.getBlockReader(blockIndex);
1558        currentLocation.set(blockIndex, 0);
1559      }
1560
1561      private void parkCursorAtEnd() throws IOException {
1562        klen = -1;
1563        currentLocation.set(endLocation);
1564        if (blkReader != null) {
1565          try {
1566            blkReader.close();
1567          } finally {
1568            blkReader = null;
1569          }
1570        }
1571      }
1572
1573      /**
1574       * Close the scanner. Release all resources. The behavior of using the
1575       * scanner after calling close is not defined. The entry returned by the
1576       * previous entry() call will be invalid.
1577       */
1578      @Override
1579      public void close() throws IOException {
1580        parkCursorAtEnd();
1581      }
1582
1583      /**
1584       * Is cursor at the end location?
1585       * 
1586       * @return true if the cursor is at the end location.
1587       */
1588      public boolean atEnd() {
1589        return (currentLocation.compareTo(endLocation) >= 0);
1590      }
1591
1592      /**
1593       * check whether we have already successfully obtained the key. It also
1594       * initializes the valueInputStream.
1595       */
1596      void checkKey() throws IOException {
1597        if (klen >= 0) return;
1598        if (atEnd()) {
1599          throw new EOFException("No key-value to read");
1600        }
1601        klen = -1;
1602        vlen = -1;
1603        valueChecked = false;
1604
1605        klen = Utils.readVInt(blkReader);
1606        blkReader.readFully(keyBuffer, 0, klen);
1607        valueBufferInputStream.reset(blkReader);
1608        if (valueBufferInputStream.isLastChunk()) {
1609          vlen = valueBufferInputStream.getRemain();
1610        }
1611      }
1612
1613      /**
1614       * Get an entry to access the key and value.
1615       * 
1616       * @return The Entry object to access the key and value.
1617       * @throws IOException
1618       */
1619      public Entry entry() throws IOException {
1620        checkKey();
1621        return new Entry();
1622      }
1623
1624      /**
1625       * Get the RecordNum corresponding to the entry pointed by the cursor.
1626       * @return The RecordNum corresponding to the entry pointed by the cursor.
1627       * @throws IOException
1628       */
1629      public long getRecordNum() throws IOException {
1630        return reader.getRecordNumByLocation(currentLocation);
1631      }
1632      
1633      /**
1634       * Internal API. Comparing the key at cursor to user-specified key.
1635       * 
1636       * @param other
1637       *          user-specified key.
1638       * @return negative if key at cursor is smaller than user key; 0 if equal;
1639       *         and positive if key at cursor greater than user key.
1640       * @throws IOException
1641       */
1642      int compareCursorKeyTo(RawComparable other) throws IOException {
1643        checkKey();
1644        return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other
1645            .offset(), other.size());
1646      }
1647
1648      /**
1649       * Entry to a &lt;Key, Value&gt; pair.
1650       */
1651      public class Entry implements Comparable<RawComparable> {
1652        /**
1653         * Get the length of the key.
1654         * 
1655         * @return the length of the key.
1656         */
1657        public int getKeyLength() {
1658          return klen;
1659        }
1660
1661        byte[] getKeyBuffer() {
1662          return keyBuffer;
1663        }
1664
1665        /**
1666         * Copy the key and value in one shot into BytesWritables. This is
1667         * equivalent to getKey(key); getValue(value);
1668         * 
1669         * @param key
1670         *          BytesWritable to hold key.
1671         * @param value
1672         *          BytesWritable to hold value
1673         * @throws IOException
1674         */
1675        public void get(BytesWritable key, BytesWritable value)
1676            throws IOException {
1677          getKey(key);
1678          getValue(value);
1679        }
1680
1681        /**
1682         * Copy the key into BytesWritable. The input BytesWritable will be
1683         * automatically resized to the actual key size.
1684         * 
1685         * @param key
1686         *          BytesWritable to hold the key.
1687         * @throws IOException
1688         */
1689        public int getKey(BytesWritable key) throws IOException {
1690          key.setSize(getKeyLength());
1691          getKey(key.getBytes());
1692          return key.getLength();
1693        }
1694
1695        /**
1696         * Copy the value into BytesWritable. The input BytesWritable will be
1697         * automatically resized to the actual value size. The implementation
1698         * directly uses the buffer inside BytesWritable for storing the value.
1699         * The call does not require the value length to be known.
1700         * 
1701         * @param value
1702         * @throws IOException
1703         */
1704        public long getValue(BytesWritable value) throws IOException {
1705          DataInputStream dis = getValueStream();
1706          int size = 0;
1707          try {
1708            int remain;
1709            while ((remain = valueBufferInputStream.getRemain()) > 0) {
1710              value.setSize(size + remain);
1711              dis.readFully(value.getBytes(), size, remain);
1712              size += remain;
1713            }
1714            return value.getLength();
1715          } finally {
1716            dis.close();
1717          }
1718        }
1719
1720        /**
1721         * Writing the key to the output stream. This method avoids copying key
1722         * buffer from Scanner into user buffer, then writing to the output
1723         * stream.
1724         * 
1725         * @param out
1726         *          The output stream
1727         * @return the length of the key.
1728         * @throws IOException
1729         */
1730        public int writeKey(OutputStream out) throws IOException {
1731          out.write(keyBuffer, 0, klen);
1732          return klen;
1733        }
1734
1735        /**
1736         * Writing the value to the output stream. This method avoids copying
1737         * value data from Scanner into user buffer, then writing to the output
1738         * stream. It does not require the value length to be known.
1739         * 
1740         * @param out
1741         *          The output stream
1742         * @return the length of the value
1743         * @throws IOException
1744         */
1745        public long writeValue(OutputStream out) throws IOException {
1746          DataInputStream dis = getValueStream();
1747          long size = 0;
1748          try {
1749            int chunkSize;
1750            while ((chunkSize = valueBufferInputStream.getRemain()) > 0) {
1751              chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE);
1752              valTransferBuffer.setSize(chunkSize);
1753              dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize);
1754              out.write(valTransferBuffer.getBytes(), 0, chunkSize);
1755              size += chunkSize;
1756            }
1757            return size;
1758          } finally {
1759            dis.close();
1760          }
1761        }
1762
1763        /**
1764         * Copy the key into user supplied buffer.
1765         * 
1766         * @param buf
1767         *          The buffer supplied by user. The length of the buffer must
1768         *          not be shorter than the key length.
1769         * @return The length of the key.
1770         * 
1771         * @throws IOException
1772         */
1773        public int getKey(byte[] buf) throws IOException {
1774          return getKey(buf, 0);
1775        }
1776
1777        /**
1778         * Copy the key into user supplied buffer.
1779         * 
1780         * @param buf
1781         *          The buffer supplied by user.
1782         * @param offset
1783         *          The starting offset of the user buffer where we should copy
1784         *          the key into. Requiring the key-length + offset no greater
1785         *          than the buffer length.
1786         * @return The length of the key.
1787         * @throws IOException
1788         */
1789        public int getKey(byte[] buf, int offset) throws IOException {
1790          if ((offset | (buf.length - offset - klen)) < 0) {
1791            throw new IndexOutOfBoundsException(
1792                "Buffer not enough to store the key");
1793          }
1794          System.arraycopy(keyBuffer, 0, buf, offset, klen);
1795          return klen;
1796        }
1797
1798        /**
1799         * Streaming access to the key. Useful for desrializing the key into
1800         * user objects.
1801         * 
1802         * @return The input stream.
1803         */
1804        public DataInputStream getKeyStream() {
1805          keyDataInputStream.reset(keyBuffer, klen);
1806          return keyDataInputStream;
1807        }
1808
1809        /**
1810         * Get the length of the value. isValueLengthKnown() must be tested
1811         * true.
1812         * 
1813         * @return the length of the value.
1814         */
1815        public int getValueLength() {
1816          if (vlen >= 0) {
1817            return vlen;
1818          }
1819
1820          throw new RuntimeException("Value length unknown.");
1821        }
1822
1823        /**
1824         * Copy value into user-supplied buffer. User supplied buffer must be
1825         * large enough to hold the whole value. The value part of the key-value
1826         * pair pointed by the current cursor is not cached and can only be
1827         * examined once. Calling any of the following functions more than once
1828         * without moving the cursor will result in exception:
1829         * {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1830         * {@link #getValueStream}.
1831         * 
1832         * @return the length of the value. Does not require
1833         *         isValueLengthKnown() to be true.
1834         * @throws IOException
1835         * 
1836         */
1837        public int getValue(byte[] buf) throws IOException {
1838          return getValue(buf, 0);
1839        }
1840
1841        /**
1842         * Copy value into user-supplied buffer. User supplied buffer must be
1843         * large enough to hold the whole value (starting from the offset). The
1844         * value part of the key-value pair pointed by the current cursor is not
1845         * cached and can only be examined once. Calling any of the following
1846         * functions more than once without moving the cursor will result in
1847         * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)},
1848         * {@link #getValueStream}.
1849         * 
1850         * @return the length of the value. Does not require
1851         *         isValueLengthKnown() to be true.
1852         * @throws IOException
1853         */
1854        public int getValue(byte[] buf, int offset) throws IOException {
1855          DataInputStream dis = getValueStream();
1856          try {
1857            if (isValueLengthKnown()) {
1858              if ((offset | (buf.length - offset - vlen)) < 0) {
1859                throw new IndexOutOfBoundsException(
1860                    "Buffer too small to hold value");
1861              }
1862              dis.readFully(buf, offset, vlen);
1863              return vlen;
1864            }
1865
1866            int nextOffset = offset;
1867            while (nextOffset < buf.length) {
1868              int n = dis.read(buf, nextOffset, buf.length - nextOffset);
1869              if (n < 0) {
1870                break;
1871              }
1872              nextOffset += n;
1873            }
1874            if (dis.read() >= 0) {
1875              // attempt to read one more byte to determine whether we reached
1876              // the
1877              // end or not.
1878              throw new IndexOutOfBoundsException(
1879                  "Buffer too small to hold value");
1880            }
1881            return nextOffset - offset;
1882          } finally {
1883            dis.close();
1884          }
1885        }
1886
1887        /**
1888         * Stream access to value. The value part of the key-value pair pointed
1889         * by the current cursor is not cached and can only be examined once.
1890         * Calling any of the following functions more than once without moving
1891         * the cursor will result in exception: {@link #getValue(byte[])},
1892         * {@link #getValue(byte[], int)}, {@link #getValueStream}.
1893         * 
1894         * @return The input stream for reading the value.
1895         * @throws IOException
1896         */
1897        public DataInputStream getValueStream() throws IOException {
1898          if (valueChecked == true) {
1899            throw new IllegalStateException(
1900                "Attempt to examine value multiple times.");
1901          }
1902          valueChecked = true;
1903          return valueDataInputStream;
1904        }
1905
1906        /**
1907         * Check whether it is safe to call getValueLength().
1908         * 
1909         * @return true if value length is known before hand. Values less than
1910         *         the chunk size will always have their lengths known before
1911         *         hand. Values that are written out as a whole (with advertised
1912         *         length up-front) will always have their lengths known in
1913         *         read.
1914         */
1915        public boolean isValueLengthKnown() {
1916          return (vlen >= 0);
1917        }
1918
1919        /**
1920         * Compare the entry key to another key. Synonymous to compareTo(key, 0,
1921         * key.length).
1922         * 
1923         * @param buf
1924         *          The key buffer.
1925         * @return comparison result between the entry key with the input key.
1926         */
1927        public int compareTo(byte[] buf) {
1928          return compareTo(buf, 0, buf.length);
1929        }
1930
1931        /**
1932         * Compare the entry key to another key. Synonymous to compareTo(new
1933         * ByteArray(buf, offset, length)
1934         * 
1935         * @param buf
1936         *          The key buffer
1937         * @param offset
1938         *          offset into the key buffer.
1939         * @param length
1940         *          the length of the key.
1941         * @return comparison result between the entry key with the input key.
1942         */
1943        public int compareTo(byte[] buf, int offset, int length) {
1944          return compareTo(new ByteArray(buf, offset, length));
1945        }
1946
1947        /**
1948         * Compare an entry with a RawComparable object. This is useful when
1949         * Entries are stored in a collection, and we want to compare a user
1950         * supplied key.
1951         */
1952        @Override
1953        public int compareTo(RawComparable key) {
1954          return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(),
1955              key.offset(), key.size());
1956        }
1957
1958        /**
1959         * Compare whether this and other points to the same key value.
1960         */
1961        @Override
1962        public boolean equals(Object other) {
1963          if (this == other) return true;
1964          if (!(other instanceof Entry)) return false;
1965          return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0;
1966        }
1967
1968        @Override
1969        public int hashCode() {
1970          return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength());
1971        }
1972      }
1973
1974      /**
1975       * Advance cursor by n positions within the block.
1976       * 
1977       * @param n
1978       *          Number of key-value pairs to skip in block.
1979       * @throws IOException
1980       */
1981      private void inBlockAdvance(long n) throws IOException {
1982        for (long i = 0; i < n; ++i) {
1983          checkKey();
1984          if (!valueBufferInputStream.isClosed()) {
1985            valueBufferInputStream.close();
1986          }
1987          klen = -1;
1988          currentLocation.incRecordIndex();
1989        }
1990      }
1991
1992      /**
1993       * Advance cursor in block until we find a key that is greater than or
1994       * equal to the input key.
1995       * 
1996       * @param key
1997       *          Key to compare.
1998       * @param greater
1999       *          advance until we find a key greater than the input key.
2000       * @return true if we find a equal key.
2001       * @throws IOException
2002       */
2003      private boolean inBlockAdvance(RawComparable key, boolean greater)
2004          throws IOException {
2005        int curBid = currentLocation.getBlockIndex();
2006        long entryInBlock = reader.getBlockEntryCount(curBid);
2007        if (curBid == endLocation.getBlockIndex()) {
2008          entryInBlock = endLocation.getRecordIndex();
2009        }
2010
2011        while (currentLocation.getRecordIndex() < entryInBlock) {
2012          int cmp = compareCursorKeyTo(key);
2013          if (cmp > 0) return false;
2014          if (cmp == 0 && !greater) return true;
2015          if (!valueBufferInputStream.isClosed()) {
2016            valueBufferInputStream.close();
2017          }
2018          klen = -1;
2019          currentLocation.incRecordIndex();
2020        }
2021
2022        throw new RuntimeException("Cannot find matching key in block.");
2023      }
2024    }
2025
2026    long getBlockEntryCount(int curBid) {
2027      return tfileIndex.getEntry(curBid).entries();
2028    }
2029
2030    BlockReader getBlockReader(int blockIndex) throws IOException {
2031      return readerBCF.getDataBlock(blockIndex);
2032    }
2033  }
2034
2035  /**
2036   * Data structure representing "TFile.meta" meta block.
2037   */
2038  static final class TFileMeta {
2039    final static String BLOCK_NAME = "TFile.meta";
2040    final Version version;
2041    private long recordCount;
2042    private final String strComparator;
2043    private final BytesComparator comparator;
2044
2045    // ctor for writes
2046    public TFileMeta(String comparator) {
2047      // set fileVersion to API version when we create it.
2048      version = TFile.API_VERSION;
2049      recordCount = 0;
2050      strComparator = (comparator == null) ? "" : comparator;
2051      this.comparator = makeComparator(strComparator);
2052    }
2053
2054    // ctor for reads
2055    public TFileMeta(DataInput in) throws IOException {
2056      version = new Version(in);
2057      if (!version.compatibleWith(TFile.API_VERSION)) {
2058        throw new RuntimeException("Incompatible TFile fileVersion.");
2059      }
2060      recordCount = Utils.readVLong(in);
2061      strComparator = Utils.readString(in);
2062      comparator = makeComparator(strComparator);
2063    }
2064
2065    @SuppressWarnings("unchecked")
2066    static BytesComparator makeComparator(String comparator) {
2067      if (comparator.length() == 0) {
2068        // unsorted keys
2069        return null;
2070      }
2071      if (comparator.equals(COMPARATOR_MEMCMP)) {
2072        // default comparator
2073        return new BytesComparator(new MemcmpRawComparator());
2074      } else if (comparator.startsWith(COMPARATOR_JCLASS)) {
2075        String compClassName =
2076            comparator.substring(COMPARATOR_JCLASS.length()).trim();
2077        try {
2078          Class compClass = Class.forName(compClassName);
2079          // use its default ctor to create an instance
2080          return new BytesComparator((RawComparator<Object>) compClass
2081              .newInstance());
2082        } catch (Exception e) {
2083          throw new IllegalArgumentException(
2084              "Failed to instantiate comparator: " + comparator + "("
2085                  + e.toString() + ")");
2086        }
2087      } else {
2088        throw new IllegalArgumentException("Unsupported comparator: "
2089            + comparator);
2090      }
2091    }
2092
2093    public void write(DataOutput out) throws IOException {
2094      TFile.API_VERSION.write(out);
2095      Utils.writeVLong(out, recordCount);
2096      Utils.writeString(out, strComparator);
2097    }
2098
2099    public long getRecordCount() {
2100      return recordCount;
2101    }
2102
2103    public void incRecordCount() {
2104      ++recordCount;
2105    }
2106
2107    public boolean isSorted() {
2108      return !strComparator.isEmpty();
2109    }
2110
2111    public String getComparatorString() {
2112      return strComparator;
2113    }
2114
2115    public BytesComparator getComparator() {
2116      return comparator;
2117    }
2118
2119    public Version getVersion() {
2120      return version;
2121    }
2122  } // END: class MetaTFileMeta
2123
2124  /**
2125   * Data structure representing "TFile.index" meta block.
2126   */
2127  static class TFileIndex {
2128    final static String BLOCK_NAME = "TFile.index";
2129    private ByteArray firstKey;
2130    private final ArrayList<TFileIndexEntry> index;
2131    private final ArrayList<Long> recordNumIndex;
2132    private final BytesComparator comparator;
2133    private long sum = 0;
2134    
2135    /**
2136     * For reading from file.
2137     * 
2138     * @throws IOException
2139     */
2140    public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
2141        throws IOException {
2142      index = new ArrayList<TFileIndexEntry>(entryCount);
2143      recordNumIndex = new ArrayList<Long>(entryCount);
2144      int size = Utils.readVInt(in); // size for the first key entry.
2145      if (size > 0) {
2146        byte[] buffer = new byte[size];
2147        in.readFully(buffer);
2148        DataInputStream firstKeyInputStream =
2149            new DataInputStream(new ByteArrayInputStream(buffer, 0, size));
2150
2151        int firstKeyLength = Utils.readVInt(firstKeyInputStream);
2152        firstKey = new ByteArray(new byte[firstKeyLength]);
2153        firstKeyInputStream.readFully(firstKey.buffer());
2154
2155        for (int i = 0; i < entryCount; i++) {
2156          size = Utils.readVInt(in);
2157          if (buffer.length < size) {
2158            buffer = new byte[size];
2159          }
2160          in.readFully(buffer, 0, size);
2161          TFileIndexEntry idx =
2162              new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
2163                  buffer, 0, size)));
2164          index.add(idx);
2165          sum += idx.entries();
2166          recordNumIndex.add(sum);
2167        }
2168      } else {
2169        if (entryCount != 0) {
2170          throw new RuntimeException("Internal error");
2171        }
2172      }
2173      this.comparator = comparator;
2174    }
2175
2176    /**
2177     * @param key
2178     *          input key.
2179     * @return the ID of the first block that contains key >= input key. Or -1
2180     *         if no such block exists.
2181     */
2182    public int lowerBound(RawComparable key) {
2183      if (comparator == null) {
2184        throw new RuntimeException("Cannot search in unsorted TFile");
2185      }
2186
2187      if (firstKey == null) {
2188        return -1; // not found
2189      }
2190
2191      int ret = Utils.lowerBound(index, key, comparator);
2192      if (ret == index.size()) {
2193        return -1;
2194      }
2195      return ret;
2196    }
2197
2198    /**
2199     * @param key
2200     *          input key.
2201     * @return the ID of the first block that contains key > input key. Or -1
2202     *         if no such block exists.
2203     */
2204    public int upperBound(RawComparable key) {
2205      if (comparator == null) {
2206        throw new RuntimeException("Cannot search in unsorted TFile");
2207      }
2208
2209      if (firstKey == null) {
2210        return -1; // not found
2211      }
2212
2213      int ret = Utils.upperBound(index, key, comparator);
2214      if (ret == index.size()) {
2215        return -1;
2216      }
2217      return ret;
2218    }
2219
2220    /**
2221     * For writing to file.
2222     */
2223    public TFileIndex(BytesComparator comparator) {
2224      index = new ArrayList<TFileIndexEntry>();
2225      recordNumIndex = new ArrayList<Long>();
2226      this.comparator = comparator;
2227    }
2228
2229    public RawComparable getFirstKey() {
2230      return firstKey;
2231    }
2232    
2233    public Reader.Location getLocationByRecordNum(long recNum) {
2234      int idx = Utils.upperBound(recordNumIndex, recNum);
2235      long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
2236      return new Reader.Location(idx, recNum-lastRecNum);
2237    }
2238
2239    public long getRecordNumByLocation(Reader.Location location) {
2240      int blkIndex = location.getBlockIndex();
2241      long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
2242      return lastRecNum + location.getRecordIndex();
2243    }
2244    
2245    public void setFirstKey(byte[] key, int offset, int length) {
2246      firstKey = new ByteArray(new byte[length]);
2247      System.arraycopy(key, offset, firstKey.buffer(), 0, length);
2248    }
2249
2250    public RawComparable getLastKey() {
2251      if (index.size() == 0) {
2252        return null;
2253      }
2254      return new ByteArray(index.get(index.size() - 1).buffer());
2255    }
2256
2257    public void addEntry(TFileIndexEntry keyEntry) {
2258      index.add(keyEntry);
2259      sum += keyEntry.entries();
2260      recordNumIndex.add(sum);
2261    }
2262
2263    public TFileIndexEntry getEntry(int bid) {
2264      return index.get(bid);
2265    }
2266
2267    public void write(DataOutput out) throws IOException {
2268      if (firstKey == null) {
2269        Utils.writeVInt(out, 0);
2270        return;
2271      }
2272
2273      DataOutputBuffer dob = new DataOutputBuffer();
2274      Utils.writeVInt(dob, firstKey.size());
2275      dob.write(firstKey.buffer());
2276      Utils.writeVInt(out, dob.size());
2277      out.write(dob.getData(), 0, dob.getLength());
2278
2279      for (TFileIndexEntry entry : index) {
2280        dob.reset();
2281        entry.write(dob);
2282        Utils.writeVInt(out, dob.getLength());
2283        out.write(dob.getData(), 0, dob.getLength());
2284      }
2285    }
2286  }
2287
2288  /**
2289   * TFile Data Index entry. We should try to make the memory footprint of each
2290   * index entry as small as possible.
2291   */
2292  static final class TFileIndexEntry implements RawComparable {
2293    final byte[] key;
2294    // count of <key, value> entries in the block.
2295    final long kvEntries;
2296
2297    public TFileIndexEntry(DataInput in) throws IOException {
2298      int len = Utils.readVInt(in);
2299      key = new byte[len];
2300      in.readFully(key, 0, len);
2301      kvEntries = Utils.readVLong(in);
2302    }
2303
2304    // default entry, without any padding
2305    public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) {
2306      key = new byte[len];
2307      System.arraycopy(newkey, offset, key, 0, len);
2308      this.kvEntries = entries;
2309    }
2310
2311    @Override
2312    public byte[] buffer() {
2313      return key;
2314    }
2315
2316    @Override
2317    public int offset() {
2318      return 0;
2319    }
2320
2321    @Override
2322    public int size() {
2323      return key.length;
2324    }
2325
2326    long entries() {
2327      return kvEntries;
2328    }
2329
2330    public void write(DataOutput out) throws IOException {
2331      Utils.writeVInt(out, key.length);
2332      out.write(key, 0, key.length);
2333      Utils.writeVLong(out, kvEntries);
2334    }
2335  }
2336
2337  /**
2338   * Dumping the TFile information.
2339   * 
2340   * @param args
2341   *          A list of TFile paths.
2342   */
2343  public static void main(String[] args) {
2344    System.out.printf("TFile Dumper (TFile %s, BCFile %s)%n", TFile.API_VERSION
2345        .toString(), BCFile.API_VERSION.toString());
2346    if (args.length == 0) {
2347      System.out
2348          .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]");
2349      System.exit(0);
2350    }
2351    Configuration conf = new Configuration();
2352
2353    for (String file : args) {
2354      System.out.println("===" + file + "===");
2355      try {
2356        TFileDumper.dumpInfo(file, System.out, conf);
2357      } catch (IOException e) {
2358        e.printStackTrace(System.err);
2359      }
2360    }
2361  }
2362}