001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with this 004 * work for additional information regarding copyright ownership. The ASF 005 * licenses this file to you under the Apache License, Version 2.0 (the 006 * "License"); you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 013 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 014 * License for the specific language governing permissions and limitations under 015 * the License. 016 */ 017 018 package org.apache.hadoop.io.file.tfile; 019 020 import java.io.ByteArrayInputStream; 021 import java.io.Closeable; 022 import java.io.DataInput; 023 import java.io.DataInputStream; 024 import java.io.DataOutput; 025 import java.io.DataOutputStream; 026 import java.io.EOFException; 027 import java.io.IOException; 028 import java.io.OutputStream; 029 import java.util.ArrayList; 030 import java.util.Comparator; 031 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.apache.hadoop.classification.InterfaceAudience; 035 import org.apache.hadoop.classification.InterfaceStability; 036 import org.apache.hadoop.conf.Configuration; 037 import org.apache.hadoop.fs.FSDataInputStream; 038 import org.apache.hadoop.fs.FSDataOutputStream; 039 import org.apache.hadoop.io.BoundedByteArrayOutputStream; 040 import org.apache.hadoop.io.BytesWritable; 041 import org.apache.hadoop.io.DataInputBuffer; 042 import org.apache.hadoop.io.DataOutputBuffer; 043 import org.apache.hadoop.io.IOUtils; 044 import org.apache.hadoop.io.RawComparator; 045 import org.apache.hadoop.io.WritableComparator; 046 import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader; 047 import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender; 048 import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder; 049 import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder; 050 import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator; 051 import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator; 052 import org.apache.hadoop.io.file.tfile.Utils.Version; 053 import org.apache.hadoop.io.serializer.JavaSerializationComparator; 054 055 /** 056 * A TFile is a container of key-value pairs. Both keys and values are type-less 057 * bytes. Keys are restricted to 64KB, value length is not restricted 058 * (practically limited to the available disk storage). TFile further provides 059 * the following features: 060 * <ul> 061 * <li>Block Compression. 062 * <li>Named meta data blocks. 063 * <li>Sorted or unsorted keys. 064 * <li>Seek by key or by file offset. 065 * </ul> 066 * The memory footprint of a TFile includes the following: 067 * <ul> 068 * <li>Some constant overhead of reading or writing a compressed block. 069 * <ul> 070 * <li>Each compressed block requires one compression/decompression codec for 071 * I/O. 072 * <li>Temporary space to buffer the key. 073 * <li>Temporary space to buffer the value (for TFile.Writer only). Values are 074 * chunk encoded, so that we buffer at most one chunk of user data. By default, 075 * the chunk buffer is 1MB. Reading chunked value does not require additional 076 * memory. 077 * </ul> 078 * <li>TFile index, which is proportional to the total number of Data Blocks. 079 * The total amount of memory needed to hold the index can be estimated as 080 * (56+AvgKeySize)*NumBlocks. 081 * <li>MetaBlock index, which is proportional to the total number of Meta 082 * Blocks.The total amount of memory needed to hold the index for Meta Blocks 083 * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock. 084 * </ul> 085 * <p> 086 * The behavior of TFile can be customized by the following variables through 087 * Configuration: 088 * <ul> 089 * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default 090 * to 1MB. Values of the length less than the chunk size is guaranteed to have 091 * known value length in read time (See 092 * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}). 093 * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for 094 * FSDataOutputStream. Integer (in bytes). Default to 256KB. 095 * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for 096 * FSDataInputStream. Integer (in bytes). Default to 256KB. 097 * </ul> 098 * <p> 099 * Suggestions on performance optimization. 100 * <ul> 101 * <li>Minimum block size. We recommend a setting of minimum block size between 102 * 256KB to 1MB for general usage. Larger block size is preferred if files are 103 * primarily for sequential access. However, it would lead to inefficient random 104 * access (because there are more data to decompress). Smaller blocks are good 105 * for random access, but require more memory to hold the block index, and may 106 * be slower to create (because we must flush the compressor stream at the 107 * conclusion of each data block, which leads to an FS I/O flush). Further, due 108 * to the internal caching in Compression codec, the smallest possible block 109 * size would be around 20KB-30KB. 110 * <li>The current implementation does not offer true multi-threading for 111 * reading. The implementation uses FSDataInputStream seek()+read(), which is 112 * shown to be much faster than positioned-read call in single thread mode. 113 * However, it also means that if multiple threads attempt to access the same 114 * TFile (using multiple scanners) simultaneously, the actual I/O is carried out 115 * sequentially even if they access different DFS blocks. 116 * <li>Compression codec. Use "none" if the data is not very compressable (by 117 * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo" 118 * as the starting point for experimenting. "gz" overs slightly better 119 * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to 120 * decompress, comparing to "lzo". 121 * <li>File system buffering, if the underlying FSDataInputStream and 122 * FSDataOutputStream is already adequately buffered; or if applications 123 * reads/writes keys and values in large buffers, we can reduce the sizes of 124 * input/output buffering in TFile layer by setting the configuration parameters 125 * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size". 126 * </ul> 127 * 128 * Some design rationale behind TFile can be found at <a 129 * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>. 130 */ 131 @InterfaceAudience.Public 132 @InterfaceStability.Evolving 133 public class TFile { 134 static final Log LOG = LogFactory.getLog(TFile.class); 135 136 private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size"; 137 private static final String FS_INPUT_BUF_SIZE_ATTR = 138 "tfile.fs.input.buffer.size"; 139 private static final String FS_OUTPUT_BUF_SIZE_ATTR = 140 "tfile.fs.output.buffer.size"; 141 142 static int getChunkBufferSize(Configuration conf) { 143 int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024); 144 return (ret > 0) ? ret : 1024 * 1024; 145 } 146 147 static int getFSInputBufferSize(Configuration conf) { 148 return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); 149 } 150 151 static int getFSOutputBufferSize(Configuration conf) { 152 return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); 153 } 154 155 private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB 156 static final Version API_VERSION = new Version((short) 1, (short) 0); 157 158 /** compression: gzip */ 159 public static final String COMPRESSION_GZ = "gz"; 160 /** compression: lzo */ 161 public static final String COMPRESSION_LZO = "lzo"; 162 /** compression: none */ 163 public static final String COMPRESSION_NONE = "none"; 164 /** comparator: memcmp */ 165 public static final String COMPARATOR_MEMCMP = "memcmp"; 166 /** comparator prefix: java class */ 167 public static final String COMPARATOR_JCLASS = "jclass:"; 168 169 /** 170 * Make a raw comparator from a string name. 171 * 172 * @param name 173 * Comparator name 174 * @return A RawComparable comparator. 175 */ 176 static public Comparator<RawComparable> makeComparator(String name) { 177 return TFileMeta.makeComparator(name); 178 } 179 180 // Prevent the instantiation of TFiles 181 private TFile() { 182 // nothing 183 } 184 185 /** 186 * Get names of supported compression algorithms. The names are acceptable by 187 * TFile.Writer. 188 * 189 * @return Array of strings, each represents a supported compression 190 * algorithm. Currently, the following compression algorithms are 191 * supported. 192 * <ul> 193 * <li>"none" - No compression. 194 * <li>"lzo" - LZO compression. 195 * <li>"gz" - GZIP compression. 196 * </ul> 197 */ 198 public static String[] getSupportedCompressionAlgorithms() { 199 return Compression.getSupportedAlgorithms(); 200 } 201 202 /** 203 * TFile Writer. 204 */ 205 @InterfaceStability.Evolving 206 public static class Writer implements Closeable { 207 // minimum compressed size for a block. 208 private final int sizeMinBlock; 209 210 // Meta blocks. 211 final TFileIndex tfileIndex; 212 final TFileMeta tfileMeta; 213 214 // reference to the underlying BCFile. 215 private BCFile.Writer writerBCF; 216 217 // current data block appender. 218 BlockAppender blkAppender; 219 long blkRecordCount; 220 221 // buffers for caching the key. 222 BoundedByteArrayOutputStream currentKeyBufferOS; 223 BoundedByteArrayOutputStream lastKeyBufferOS; 224 225 // buffer used by chunk codec 226 private byte[] valueBuffer; 227 228 /** 229 * Writer states. The state always transits in circles: READY -> IN_KEY -> 230 * END_KEY -> IN_VALUE -> READY. 231 */ 232 private enum State { 233 READY, // Ready to start a new key-value pair insertion. 234 IN_KEY, // In the middle of key insertion. 235 END_KEY, // Key insertion complete, ready to insert value. 236 IN_VALUE, // In value insertion. 237 // ERROR, // Error encountered, cannot continue. 238 CLOSED, // TFile already closed. 239 }; 240 241 // current state of Writer. 242 State state = State.READY; 243 Configuration conf; 244 long errorCount = 0; 245 246 /** 247 * Constructor 248 * 249 * @param fsdos 250 * output stream for writing. Must be at position 0. 251 * @param minBlockSize 252 * Minimum compressed block size in bytes. A compression block will 253 * not be closed until it reaches this size except for the last 254 * block. 255 * @param compressName 256 * Name of the compression algorithm. Must be one of the strings 257 * returned by {@link TFile#getSupportedCompressionAlgorithms()}. 258 * @param comparator 259 * Leave comparator as null or empty string if TFile is not sorted. 260 * Otherwise, provide the string name for the comparison algorithm 261 * for keys. Two kinds of comparators are supported. 262 * <ul> 263 * <li>Algorithmic comparator: binary comparators that is language 264 * independent. Currently, only "memcmp" is supported. 265 * <li>Language-specific comparator: binary comparators that can 266 * only be constructed in specific language. For Java, the syntax 267 * is "jclass:", followed by the class name of the RawComparator. 268 * Currently, we only support RawComparators that can be 269 * constructed through the default constructor (with no 270 * parameters). Parameterized RawComparators such as 271 * {@link WritableComparator} or 272 * {@link JavaSerializationComparator} may not be directly used. 273 * One should write a wrapper class that inherits from such classes 274 * and use its default constructor to perform proper 275 * initialization. 276 * </ul> 277 * @param conf 278 * The configuration object. 279 * @throws IOException 280 */ 281 public Writer(FSDataOutputStream fsdos, int minBlockSize, 282 String compressName, String comparator, Configuration conf) 283 throws IOException { 284 sizeMinBlock = minBlockSize; 285 tfileMeta = new TFileMeta(comparator); 286 tfileIndex = new TFileIndex(tfileMeta.getComparator()); 287 288 writerBCF = new BCFile.Writer(fsdos, compressName, conf); 289 currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); 290 lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); 291 this.conf = conf; 292 } 293 294 /** 295 * Close the Writer. Resources will be released regardless of the exceptions 296 * being thrown. Future close calls will have no effect. 297 * 298 * The underlying FSDataOutputStream is not closed. 299 */ 300 @Override 301 public void close() throws IOException { 302 if ((state == State.CLOSED)) { 303 return; 304 } 305 try { 306 // First try the normal finish. 307 // Terminate upon the first Exception. 308 if (errorCount == 0) { 309 if (state != State.READY) { 310 throw new IllegalStateException( 311 "Cannot close TFile in the middle of key-value insertion."); 312 } 313 314 finishDataBlock(true); 315 316 // first, write out data:TFile.meta 317 BlockAppender outMeta = 318 writerBCF 319 .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE); 320 try { 321 tfileMeta.write(outMeta); 322 } finally { 323 outMeta.close(); 324 } 325 326 // second, write out data:TFile.index 327 BlockAppender outIndex = 328 writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME); 329 try { 330 tfileIndex.write(outIndex); 331 } finally { 332 outIndex.close(); 333 } 334 335 writerBCF.close(); 336 } 337 } finally { 338 IOUtils.cleanup(LOG, blkAppender, writerBCF); 339 blkAppender = null; 340 writerBCF = null; 341 state = State.CLOSED; 342 } 343 } 344 345 /** 346 * Adding a new key-value pair to the TFile. This is synonymous to 347 * append(key, 0, key.length, value, 0, value.length) 348 * 349 * @param key 350 * Buffer for key. 351 * @param value 352 * Buffer for value. 353 * @throws IOException 354 */ 355 public void append(byte[] key, byte[] value) throws IOException { 356 append(key, 0, key.length, value, 0, value.length); 357 } 358 359 /** 360 * Adding a new key-value pair to TFile. 361 * 362 * @param key 363 * buffer for key. 364 * @param koff 365 * offset in key buffer. 366 * @param klen 367 * length of key. 368 * @param value 369 * buffer for value. 370 * @param voff 371 * offset in value buffer. 372 * @param vlen 373 * length of value. 374 * @throws IOException 375 * Upon IO errors. 376 * <p> 377 * If an exception is thrown, the TFile will be in an inconsistent 378 * state. The only legitimate call after that would be close 379 */ 380 public void append(byte[] key, int koff, int klen, byte[] value, int voff, 381 int vlen) throws IOException { 382 if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) { 383 throw new IndexOutOfBoundsException( 384 "Bad key buffer offset-length combination."); 385 } 386 387 if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) { 388 throw new IndexOutOfBoundsException( 389 "Bad value buffer offset-length combination."); 390 } 391 392 try { 393 DataOutputStream dosKey = prepareAppendKey(klen); 394 try { 395 ++errorCount; 396 dosKey.write(key, koff, klen); 397 --errorCount; 398 } finally { 399 dosKey.close(); 400 } 401 402 DataOutputStream dosValue = prepareAppendValue(vlen); 403 try { 404 ++errorCount; 405 dosValue.write(value, voff, vlen); 406 --errorCount; 407 } finally { 408 dosValue.close(); 409 } 410 } finally { 411 state = State.READY; 412 } 413 } 414 415 /** 416 * Helper class to register key after close call on key append stream. 417 */ 418 private class KeyRegister extends DataOutputStream { 419 private final int expectedLength; 420 private boolean closed = false; 421 422 public KeyRegister(int len) { 423 super(currentKeyBufferOS); 424 if (len >= 0) { 425 currentKeyBufferOS.reset(len); 426 } else { 427 currentKeyBufferOS.reset(); 428 } 429 expectedLength = len; 430 } 431 432 @Override 433 public void close() throws IOException { 434 if (closed == true) { 435 return; 436 } 437 438 try { 439 ++errorCount; 440 byte[] key = currentKeyBufferOS.getBuffer(); 441 int len = currentKeyBufferOS.size(); 442 /** 443 * verify length. 444 */ 445 if (expectedLength >= 0 && expectedLength != len) { 446 throw new IOException("Incorrect key length: expected=" 447 + expectedLength + " actual=" + len); 448 } 449 450 Utils.writeVInt(blkAppender, len); 451 blkAppender.write(key, 0, len); 452 if (tfileIndex.getFirstKey() == null) { 453 tfileIndex.setFirstKey(key, 0, len); 454 } 455 456 if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) { 457 byte[] lastKey = lastKeyBufferOS.getBuffer(); 458 int lastLen = lastKeyBufferOS.size(); 459 if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0, 460 lastLen) < 0) { 461 throw new IOException("Keys are not added in sorted order"); 462 } 463 } 464 465 BoundedByteArrayOutputStream tmp = currentKeyBufferOS; 466 currentKeyBufferOS = lastKeyBufferOS; 467 lastKeyBufferOS = tmp; 468 --errorCount; 469 } finally { 470 closed = true; 471 state = State.END_KEY; 472 } 473 } 474 } 475 476 /** 477 * Helper class to register value after close call on value append stream. 478 */ 479 private class ValueRegister extends DataOutputStream { 480 private boolean closed = false; 481 482 public ValueRegister(OutputStream os) { 483 super(os); 484 } 485 486 // Avoiding flushing call to down stream. 487 @Override 488 public void flush() { 489 // do nothing 490 } 491 492 @Override 493 public void close() throws IOException { 494 if (closed == true) { 495 return; 496 } 497 498 try { 499 ++errorCount; 500 super.close(); 501 blkRecordCount++; 502 // bump up the total record count in the whole file 503 tfileMeta.incRecordCount(); 504 finishDataBlock(false); 505 --errorCount; 506 } finally { 507 closed = true; 508 state = State.READY; 509 } 510 } 511 } 512 513 /** 514 * Obtain an output stream for writing a key into TFile. This may only be 515 * called when there is no active Key appending stream or value appending 516 * stream. 517 * 518 * @param length 519 * The expected length of the key. If length of the key is not 520 * known, set length = -1. Otherwise, the application must write 521 * exactly as many bytes as specified here before calling close on 522 * the returned output stream. 523 * @return The key appending output stream. 524 * @throws IOException 525 * 526 */ 527 public DataOutputStream prepareAppendKey(int length) throws IOException { 528 if (state != State.READY) { 529 throw new IllegalStateException("Incorrect state to start a new key: " 530 + state.name()); 531 } 532 533 initDataBlock(); 534 DataOutputStream ret = new KeyRegister(length); 535 state = State.IN_KEY; 536 return ret; 537 } 538 539 /** 540 * Obtain an output stream for writing a value into TFile. This may only be 541 * called right after a key appending operation (the key append stream must 542 * be closed). 543 * 544 * @param length 545 * The expected length of the value. If length of the value is not 546 * known, set length = -1. Otherwise, the application must write 547 * exactly as many bytes as specified here before calling close on 548 * the returned output stream. Advertising the value size up-front 549 * guarantees that the value is encoded in one chunk, and avoids 550 * intermediate chunk buffering. 551 * @throws IOException 552 * 553 */ 554 public DataOutputStream prepareAppendValue(int length) throws IOException { 555 if (state != State.END_KEY) { 556 throw new IllegalStateException( 557 "Incorrect state to start a new value: " + state.name()); 558 } 559 560 DataOutputStream ret; 561 562 // unknown length 563 if (length < 0) { 564 if (valueBuffer == null) { 565 valueBuffer = new byte[getChunkBufferSize(conf)]; 566 } 567 ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer)); 568 } else { 569 ret = 570 new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length)); 571 } 572 573 state = State.IN_VALUE; 574 return ret; 575 } 576 577 /** 578 * Obtain an output stream for creating a meta block. This function may not 579 * be called when there is a key append stream or value append stream 580 * active. No more key-value insertion is allowed after a meta data block 581 * has been added to TFile. 582 * 583 * @param name 584 * Name of the meta block. 585 * @param compressName 586 * Name of the compression algorithm to be used. Must be one of the 587 * strings returned by 588 * {@link TFile#getSupportedCompressionAlgorithms()}. 589 * @return A DataOutputStream that can be used to write Meta Block data. 590 * Closing the stream would signal the ending of the block. 591 * @throws IOException 592 * @throws MetaBlockAlreadyExists 593 * the Meta Block with the same name already exists. 594 */ 595 public DataOutputStream prepareMetaBlock(String name, String compressName) 596 throws IOException, MetaBlockAlreadyExists { 597 if (state != State.READY) { 598 throw new IllegalStateException( 599 "Incorrect state to start a Meta Block: " + state.name()); 600 } 601 602 finishDataBlock(true); 603 DataOutputStream outputStream = 604 writerBCF.prepareMetaBlock(name, compressName); 605 return outputStream; 606 } 607 608 /** 609 * Obtain an output stream for creating a meta block. This function may not 610 * be called when there is a key append stream or value append stream 611 * active. No more key-value insertion is allowed after a meta data block 612 * has been added to TFile. Data will be compressed using the default 613 * compressor as defined in Writer's constructor. 614 * 615 * @param name 616 * Name of the meta block. 617 * @return A DataOutputStream that can be used to write Meta Block data. 618 * Closing the stream would signal the ending of the block. 619 * @throws IOException 620 * @throws MetaBlockAlreadyExists 621 * the Meta Block with the same name already exists. 622 */ 623 public DataOutputStream prepareMetaBlock(String name) throws IOException, 624 MetaBlockAlreadyExists { 625 if (state != State.READY) { 626 throw new IllegalStateException( 627 "Incorrect state to start a Meta Block: " + state.name()); 628 } 629 630 finishDataBlock(true); 631 return writerBCF.prepareMetaBlock(name); 632 } 633 634 /** 635 * Check if we need to start a new data block. 636 * 637 * @throws IOException 638 */ 639 private void initDataBlock() throws IOException { 640 // for each new block, get a new appender 641 if (blkAppender == null) { 642 blkAppender = writerBCF.prepareDataBlock(); 643 } 644 } 645 646 /** 647 * Close the current data block if necessary. 648 * 649 * @param bForceFinish 650 * Force the closure regardless of the block size. 651 * @throws IOException 652 */ 653 void finishDataBlock(boolean bForceFinish) throws IOException { 654 if (blkAppender == null) { 655 return; 656 } 657 658 // exceeded the size limit, do the compression and finish the block 659 if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) { 660 // keep tracks of the last key of each data block, no padding 661 // for now 662 TFileIndexEntry keyLast = 663 new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS 664 .size(), blkRecordCount); 665 tfileIndex.addEntry(keyLast); 666 // close the appender 667 blkAppender.close(); 668 blkAppender = null; 669 blkRecordCount = 0; 670 } 671 } 672 } 673 674 /** 675 * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner. 676 * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()} 677 * ) , a portion of TFile based on byte offsets ( 678 * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys 679 * fall in a certain key range (for sorted TFile only, 680 * {@link Reader#createScannerByKey(byte[], byte[])} or 681 * {@link Reader#createScannerByKey(RawComparable, RawComparable)}). 682 */ 683 @InterfaceStability.Evolving 684 public static class Reader implements Closeable { 685 // The underlying BCFile reader. 686 final BCFile.Reader readerBCF; 687 688 // TFile index, it is loaded lazily. 689 TFileIndex tfileIndex = null; 690 final TFileMeta tfileMeta; 691 final BytesComparator comparator; 692 693 // global begin and end locations. 694 private final Location begin; 695 private final Location end; 696 697 /** 698 * Location representing a virtual position in the TFile. 699 */ 700 static final class Location implements Comparable<Location>, Cloneable { 701 private int blockIndex; 702 // distance/offset from the beginning of the block 703 private long recordIndex; 704 705 Location(int blockIndex, long recordIndex) { 706 set(blockIndex, recordIndex); 707 } 708 709 void incRecordIndex() { 710 ++recordIndex; 711 } 712 713 Location(Location other) { 714 set(other); 715 } 716 717 int getBlockIndex() { 718 return blockIndex; 719 } 720 721 long getRecordIndex() { 722 return recordIndex; 723 } 724 725 void set(int blockIndex, long recordIndex) { 726 if ((blockIndex | recordIndex) < 0) { 727 throw new IllegalArgumentException( 728 "Illegal parameter for BlockLocation."); 729 } 730 this.blockIndex = blockIndex; 731 this.recordIndex = recordIndex; 732 } 733 734 void set(Location other) { 735 set(other.blockIndex, other.recordIndex); 736 } 737 738 /** 739 * @see java.lang.Comparable#compareTo(java.lang.Object) 740 */ 741 @Override 742 public int compareTo(Location other) { 743 return compareTo(other.blockIndex, other.recordIndex); 744 } 745 746 int compareTo(int bid, long rid) { 747 if (this.blockIndex == bid) { 748 long ret = this.recordIndex - rid; 749 if (ret > 0) return 1; 750 if (ret < 0) return -1; 751 return 0; 752 } 753 return this.blockIndex - bid; 754 } 755 756 /** 757 * @see java.lang.Object#clone() 758 */ 759 @Override 760 protected Location clone() { 761 return new Location(blockIndex, recordIndex); 762 } 763 764 /** 765 * @see java.lang.Object#hashCode() 766 */ 767 @Override 768 public int hashCode() { 769 final int prime = 31; 770 int result = prime + blockIndex; 771 result = (int) (prime * result + recordIndex); 772 return result; 773 } 774 775 /** 776 * @see java.lang.Object#equals(java.lang.Object) 777 */ 778 @Override 779 public boolean equals(Object obj) { 780 if (this == obj) return true; 781 if (obj == null) return false; 782 if (getClass() != obj.getClass()) return false; 783 Location other = (Location) obj; 784 if (blockIndex != other.blockIndex) return false; 785 if (recordIndex != other.recordIndex) return false; 786 return true; 787 } 788 } 789 790 /** 791 * Constructor 792 * 793 * @param fsdis 794 * FS input stream of the TFile. 795 * @param fileLength 796 * The length of TFile. This is required because we have no easy 797 * way of knowing the actual size of the input file through the 798 * File input stream. 799 * @param conf 800 * @throws IOException 801 */ 802 public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf) 803 throws IOException { 804 readerBCF = new BCFile.Reader(fsdis, fileLength, conf); 805 806 // first, read TFile meta 807 BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME); 808 try { 809 tfileMeta = new TFileMeta(brMeta); 810 } finally { 811 brMeta.close(); 812 } 813 814 comparator = tfileMeta.getComparator(); 815 // Set begin and end locations. 816 begin = new Location(0, 0); 817 end = new Location(readerBCF.getBlockCount(), 0); 818 } 819 820 /** 821 * Close the reader. The state of the Reader object is undefined after 822 * close. Calling close() for multiple times has no effect. 823 */ 824 @Override 825 public void close() throws IOException { 826 readerBCF.close(); 827 } 828 829 /** 830 * Get the begin location of the TFile. 831 * 832 * @return If TFile is not empty, the location of the first key-value pair. 833 * Otherwise, it returns end(). 834 */ 835 Location begin() { 836 return begin; 837 } 838 839 /** 840 * Get the end location of the TFile. 841 * 842 * @return The location right after the last key-value pair in TFile. 843 */ 844 Location end() { 845 return end; 846 } 847 848 /** 849 * Get the string representation of the comparator. 850 * 851 * @return If the TFile is not sorted by keys, an empty string will be 852 * returned. Otherwise, the actual comparator string that is 853 * provided during the TFile creation time will be returned. 854 */ 855 public String getComparatorName() { 856 return tfileMeta.getComparatorString(); 857 } 858 859 /** 860 * Is the TFile sorted? 861 * 862 * @return true if TFile is sorted. 863 */ 864 public boolean isSorted() { 865 return tfileMeta.isSorted(); 866 } 867 868 /** 869 * Get the number of key-value pair entries in TFile. 870 * 871 * @return the number of key-value pairs in TFile 872 */ 873 public long getEntryCount() { 874 return tfileMeta.getRecordCount(); 875 } 876 877 /** 878 * Lazily loading the TFile index. 879 * 880 * @throws IOException 881 */ 882 synchronized void checkTFileDataIndex() throws IOException { 883 if (tfileIndex == null) { 884 BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME); 885 try { 886 tfileIndex = 887 new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta 888 .getComparator()); 889 } finally { 890 brIndex.close(); 891 } 892 } 893 } 894 895 /** 896 * Get the first key in the TFile. 897 * 898 * @return The first key in the TFile. 899 * @throws IOException 900 */ 901 public RawComparable getFirstKey() throws IOException { 902 checkTFileDataIndex(); 903 return tfileIndex.getFirstKey(); 904 } 905 906 /** 907 * Get the last key in the TFile. 908 * 909 * @return The last key in the TFile. 910 * @throws IOException 911 */ 912 public RawComparable getLastKey() throws IOException { 913 checkTFileDataIndex(); 914 return tfileIndex.getLastKey(); 915 } 916 917 /** 918 * Get a Comparator object to compare Entries. It is useful when you want 919 * stores the entries in a collection (such as PriorityQueue) and perform 920 * sorting or comparison among entries based on the keys without copying out 921 * the key. 922 * 923 * @return An Entry Comparator.. 924 */ 925 public Comparator<Scanner.Entry> getEntryComparator() { 926 if (!isSorted()) { 927 throw new RuntimeException( 928 "Entries are not comparable for unsorted TFiles"); 929 } 930 931 return new Comparator<Scanner.Entry>() { 932 /** 933 * Provide a customized comparator for Entries. This is useful if we 934 * have a collection of Entry objects. However, if the Entry objects 935 * come from different TFiles, users must ensure that those TFiles share 936 * the same RawComparator. 937 */ 938 @Override 939 public int compare(Scanner.Entry o1, Scanner.Entry o2) { 940 return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2 941 .getKeyBuffer(), 0, o2.getKeyLength()); 942 } 943 }; 944 } 945 946 /** 947 * Get an instance of the RawComparator that is constructed based on the 948 * string comparator representation. 949 * 950 * @return a Comparator that can compare RawComparable's. 951 */ 952 public Comparator<RawComparable> getComparator() { 953 return comparator; 954 } 955 956 /** 957 * Stream access to a meta block.`` 958 * 959 * @param name 960 * The name of the meta block. 961 * @return The input stream. 962 * @throws IOException 963 * on I/O error. 964 * @throws MetaBlockDoesNotExist 965 * If the meta block with the name does not exist. 966 */ 967 public DataInputStream getMetaBlock(String name) throws IOException, 968 MetaBlockDoesNotExist { 969 return readerBCF.getMetaBlock(name); 970 } 971 972 /** 973 * if greater is true then returns the beginning location of the block 974 * containing the key strictly greater than input key. if greater is false 975 * then returns the beginning location of the block greater than equal to 976 * the input key 977 * 978 * @param key 979 * the input key 980 * @param greater 981 * boolean flag 982 * @return 983 * @throws IOException 984 */ 985 Location getBlockContainsKey(RawComparable key, boolean greater) 986 throws IOException { 987 if (!isSorted()) { 988 throw new RuntimeException("Seeking in unsorted TFile"); 989 } 990 checkTFileDataIndex(); 991 int blkIndex = 992 (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key); 993 if (blkIndex < 0) return end; 994 return new Location(blkIndex, 0); 995 } 996 997 Location getLocationByRecordNum(long recNum) throws IOException { 998 checkTFileDataIndex(); 999 return tfileIndex.getLocationByRecordNum(recNum); 1000 } 1001 1002 long getRecordNumByLocation(Location location) throws IOException { 1003 checkTFileDataIndex(); 1004 return tfileIndex.getRecordNumByLocation(location); 1005 } 1006 1007 int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) { 1008 if (!isSorted()) { 1009 throw new RuntimeException("Cannot compare keys for unsorted TFiles."); 1010 } 1011 return comparator.compare(a, o1, l1, b, o2, l2); 1012 } 1013 1014 int compareKeys(RawComparable a, RawComparable b) { 1015 if (!isSorted()) { 1016 throw new RuntimeException("Cannot compare keys for unsorted TFiles."); 1017 } 1018 return comparator.compare(a, b); 1019 } 1020 1021 /** 1022 * Get the location pointing to the beginning of the first key-value pair in 1023 * a compressed block whose byte offset in the TFile is greater than or 1024 * equal to the specified offset. 1025 * 1026 * @param offset 1027 * the user supplied offset. 1028 * @return the location to the corresponding entry; or end() if no such 1029 * entry exists. 1030 */ 1031 Location getLocationNear(long offset) { 1032 int blockIndex = readerBCF.getBlockIndexNear(offset); 1033 if (blockIndex == -1) return end; 1034 return new Location(blockIndex, 0); 1035 } 1036 1037 /** 1038 * Get the RecordNum for the first key-value pair in a compressed block 1039 * whose byte offset in the TFile is greater than or equal to the specified 1040 * offset. 1041 * 1042 * @param offset 1043 * the user supplied offset. 1044 * @return the RecordNum to the corresponding entry. If no such entry 1045 * exists, it returns the total entry count. 1046 * @throws IOException 1047 */ 1048 public long getRecordNumNear(long offset) throws IOException { 1049 return getRecordNumByLocation(getLocationNear(offset)); 1050 } 1051 1052 /** 1053 * Get a sample key that is within a block whose starting offset is greater 1054 * than or equal to the specified offset. 1055 * 1056 * @param offset 1057 * The file offset. 1058 * @return the key that fits the requirement; or null if no such key exists 1059 * (which could happen if the offset is close to the end of the 1060 * TFile). 1061 * @throws IOException 1062 */ 1063 public RawComparable getKeyNear(long offset) throws IOException { 1064 int blockIndex = readerBCF.getBlockIndexNear(offset); 1065 if (blockIndex == -1) return null; 1066 checkTFileDataIndex(); 1067 return new ByteArray(tfileIndex.getEntry(blockIndex).key); 1068 } 1069 1070 /** 1071 * Get a scanner than can scan the whole TFile. 1072 * 1073 * @return The scanner object. A valid Scanner is always returned even if 1074 * the TFile is empty. 1075 * @throws IOException 1076 */ 1077 public Scanner createScanner() throws IOException { 1078 return new Scanner(this, begin, end); 1079 } 1080 1081 /** 1082 * Get a scanner that covers a portion of TFile based on byte offsets. 1083 * 1084 * @param offset 1085 * The beginning byte offset in the TFile. 1086 * @param length 1087 * The length of the region. 1088 * @return The actual coverage of the returned scanner tries to match the 1089 * specified byte-region but always round up to the compression 1090 * block boundaries. It is possible that the returned scanner 1091 * contains zero key-value pairs even if length is positive. 1092 * @throws IOException 1093 */ 1094 public Scanner createScannerByByteRange(long offset, long length) throws IOException { 1095 return new Scanner(this, offset, offset + length); 1096 } 1097 1098 /** 1099 * Get a scanner that covers a portion of TFile based on keys. 1100 * 1101 * @param beginKey 1102 * Begin key of the scan (inclusive). If null, scan from the first 1103 * key-value entry of the TFile. 1104 * @param endKey 1105 * End key of the scan (exclusive). If null, scan up to the last 1106 * key-value entry of the TFile. 1107 * @return The actual coverage of the returned scanner will cover all keys 1108 * greater than or equal to the beginKey and less than the endKey. 1109 * @throws IOException 1110 * 1111 * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead. 1112 */ 1113 @Deprecated 1114 public Scanner createScanner(byte[] beginKey, byte[] endKey) 1115 throws IOException { 1116 return createScannerByKey(beginKey, endKey); 1117 } 1118 1119 /** 1120 * Get a scanner that covers a portion of TFile based on keys. 1121 * 1122 * @param beginKey 1123 * Begin key of the scan (inclusive). If null, scan from the first 1124 * key-value entry of the TFile. 1125 * @param endKey 1126 * End key of the scan (exclusive). If null, scan up to the last 1127 * key-value entry of the TFile. 1128 * @return The actual coverage of the returned scanner will cover all keys 1129 * greater than or equal to the beginKey and less than the endKey. 1130 * @throws IOException 1131 */ 1132 public Scanner createScannerByKey(byte[] beginKey, byte[] endKey) 1133 throws IOException { 1134 return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey, 1135 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey, 1136 0, endKey.length)); 1137 } 1138 1139 /** 1140 * Get a scanner that covers a specific key range. 1141 * 1142 * @param beginKey 1143 * Begin key of the scan (inclusive). If null, scan from the first 1144 * key-value entry of the TFile. 1145 * @param endKey 1146 * End key of the scan (exclusive). If null, scan up to the last 1147 * key-value entry of the TFile. 1148 * @return The actual coverage of the returned scanner will cover all keys 1149 * greater than or equal to the beginKey and less than the endKey. 1150 * @throws IOException 1151 * 1152 * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)} 1153 * instead. 1154 */ 1155 @Deprecated 1156 public Scanner createScanner(RawComparable beginKey, RawComparable endKey) 1157 throws IOException { 1158 return createScannerByKey(beginKey, endKey); 1159 } 1160 1161 /** 1162 * Get a scanner that covers a specific key range. 1163 * 1164 * @param beginKey 1165 * Begin key of the scan (inclusive). If null, scan from the first 1166 * key-value entry of the TFile. 1167 * @param endKey 1168 * End key of the scan (exclusive). If null, scan up to the last 1169 * key-value entry of the TFile. 1170 * @return The actual coverage of the returned scanner will cover all keys 1171 * greater than or equal to the beginKey and less than the endKey. 1172 * @throws IOException 1173 */ 1174 public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey) 1175 throws IOException { 1176 if ((beginKey != null) && (endKey != null) 1177 && (compareKeys(beginKey, endKey) >= 0)) { 1178 return new Scanner(this, beginKey, beginKey); 1179 } 1180 return new Scanner(this, beginKey, endKey); 1181 } 1182 1183 /** 1184 * Create a scanner that covers a range of records. 1185 * 1186 * @param beginRecNum 1187 * The RecordNum for the first record (inclusive). 1188 * @param endRecNum 1189 * The RecordNum for the last record (exclusive). To scan the whole 1190 * file, either specify endRecNum==-1 or endRecNum==getEntryCount(). 1191 * @return The TFile scanner that covers the specified range of records. 1192 * @throws IOException 1193 */ 1194 public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum) 1195 throws IOException { 1196 if (beginRecNum < 0) beginRecNum = 0; 1197 if (endRecNum < 0 || endRecNum > getEntryCount()) { 1198 endRecNum = getEntryCount(); 1199 } 1200 return new Scanner(this, getLocationByRecordNum(beginRecNum), 1201 getLocationByRecordNum(endRecNum)); 1202 } 1203 1204 /** 1205 * The TFile Scanner. The Scanner has an implicit cursor, which, upon 1206 * creation, points to the first key-value pair in the scan range. If the 1207 * scan range is empty, the cursor will point to the end of the scan range. 1208 * <p> 1209 * Use {@link Scanner#atEnd()} to test whether the cursor is at the end 1210 * location of the scanner. 1211 * <p> 1212 * Use {@link Scanner#advance()} to move the cursor to the next key-value 1213 * pair (or end if none exists). Use seekTo methods ( 1214 * {@link Scanner#seekTo(byte[])} or 1215 * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary 1216 * location in the covered range (including backward seeking). Use 1217 * {@link Scanner#rewind()} to seek back to the beginning of the scanner. 1218 * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner. 1219 * <p> 1220 * Actual keys and values may be obtained through {@link Scanner.Entry} 1221 * object, which is obtained through {@link Scanner#entry()}. 1222 */ 1223 public static class Scanner implements Closeable { 1224 // The underlying TFile reader. 1225 final Reader reader; 1226 // current block (null if reaching end) 1227 private BlockReader blkReader; 1228 1229 Location beginLocation; 1230 Location endLocation; 1231 Location currentLocation; 1232 1233 // flag to ensure value is only examined once. 1234 boolean valueChecked = false; 1235 // reusable buffer for keys. 1236 final byte[] keyBuffer; 1237 // length of key, -1 means key is invalid. 1238 int klen = -1; 1239 1240 static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024; 1241 BytesWritable valTransferBuffer; 1242 1243 DataInputBuffer keyDataInputStream; 1244 ChunkDecoder valueBufferInputStream; 1245 DataInputStream valueDataInputStream; 1246 // vlen == -1 if unknown. 1247 int vlen; 1248 1249 /** 1250 * Constructor 1251 * 1252 * @param reader 1253 * The TFile reader object. 1254 * @param offBegin 1255 * Begin byte-offset of the scan. 1256 * @param offEnd 1257 * End byte-offset of the scan. 1258 * @throws IOException 1259 * 1260 * The offsets will be rounded to the beginning of a compressed 1261 * block whose offset is greater than or equal to the specified 1262 * offset. 1263 */ 1264 protected Scanner(Reader reader, long offBegin, long offEnd) 1265 throws IOException { 1266 this(reader, reader.getLocationNear(offBegin), reader 1267 .getLocationNear(offEnd)); 1268 } 1269 1270 /** 1271 * Constructor 1272 * 1273 * @param reader 1274 * The TFile reader object. 1275 * @param begin 1276 * Begin location of the scan. 1277 * @param end 1278 * End location of the scan. 1279 * @throws IOException 1280 */ 1281 Scanner(Reader reader, Location begin, Location end) throws IOException { 1282 this.reader = reader; 1283 // ensure the TFile index is loaded throughout the life of scanner. 1284 reader.checkTFileDataIndex(); 1285 beginLocation = begin; 1286 endLocation = end; 1287 1288 valTransferBuffer = new BytesWritable(); 1289 // TODO: remember the longest key in a TFile, and use it to replace 1290 // MAX_KEY_SIZE. 1291 keyBuffer = new byte[MAX_KEY_SIZE]; 1292 keyDataInputStream = new DataInputBuffer(); 1293 valueBufferInputStream = new ChunkDecoder(); 1294 valueDataInputStream = new DataInputStream(valueBufferInputStream); 1295 1296 if (beginLocation.compareTo(endLocation) >= 0) { 1297 currentLocation = new Location(endLocation); 1298 } else { 1299 currentLocation = new Location(0, 0); 1300 initBlock(beginLocation.getBlockIndex()); 1301 inBlockAdvance(beginLocation.getRecordIndex()); 1302 } 1303 } 1304 1305 /** 1306 * Constructor 1307 * 1308 * @param reader 1309 * The TFile reader object. 1310 * @param beginKey 1311 * Begin key of the scan. If null, scan from the first <K,V> 1312 * entry of the TFile. 1313 * @param endKey 1314 * End key of the scan. If null, scan up to the last <K, V> entry 1315 * of the TFile. 1316 * @throws IOException 1317 */ 1318 protected Scanner(Reader reader, RawComparable beginKey, 1319 RawComparable endKey) throws IOException { 1320 this(reader, (beginKey == null) ? reader.begin() : reader 1321 .getBlockContainsKey(beginKey, false), reader.end()); 1322 if (beginKey != null) { 1323 inBlockAdvance(beginKey, false); 1324 beginLocation.set(currentLocation); 1325 } 1326 if (endKey != null) { 1327 seekTo(endKey, false); 1328 endLocation.set(currentLocation); 1329 seekTo(beginLocation); 1330 } 1331 } 1332 1333 /** 1334 * Move the cursor to the first entry whose key is greater than or equal 1335 * to the input key. Synonymous to seekTo(key, 0, key.length). The entry 1336 * returned by the previous entry() call will be invalid. 1337 * 1338 * @param key 1339 * The input key 1340 * @return true if we find an equal key. 1341 * @throws IOException 1342 */ 1343 public boolean seekTo(byte[] key) throws IOException { 1344 return seekTo(key, 0, key.length); 1345 } 1346 1347 /** 1348 * Move the cursor to the first entry whose key is greater than or equal 1349 * to the input key. The entry returned by the previous entry() call will 1350 * be invalid. 1351 * 1352 * @param key 1353 * The input key 1354 * @param keyOffset 1355 * offset in the key buffer. 1356 * @param keyLen 1357 * key buffer length. 1358 * @return true if we find an equal key; false otherwise. 1359 * @throws IOException 1360 */ 1361 public boolean seekTo(byte[] key, int keyOffset, int keyLen) 1362 throws IOException { 1363 return seekTo(new ByteArray(key, keyOffset, keyLen), false); 1364 } 1365 1366 private boolean seekTo(RawComparable key, boolean beyond) 1367 throws IOException { 1368 Location l = reader.getBlockContainsKey(key, beyond); 1369 if (l.compareTo(beginLocation) < 0) { 1370 l = beginLocation; 1371 } else if (l.compareTo(endLocation) >= 0) { 1372 seekTo(endLocation); 1373 return false; 1374 } 1375 1376 // check if what we are seeking is in the later part of the current 1377 // block. 1378 if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex()) 1379 || (compareCursorKeyTo(key) >= 0)) { 1380 // sorry, we must seek to a different location first. 1381 seekTo(l); 1382 } 1383 1384 return inBlockAdvance(key, beyond); 1385 } 1386 1387 /** 1388 * Move the cursor to the new location. The entry returned by the previous 1389 * entry() call will be invalid. 1390 * 1391 * @param l 1392 * new cursor location. It must fall between the begin and end 1393 * location of the scanner. 1394 * @throws IOException 1395 */ 1396 private void seekTo(Location l) throws IOException { 1397 if (l.compareTo(beginLocation) < 0) { 1398 throw new IllegalArgumentException( 1399 "Attempt to seek before the begin location."); 1400 } 1401 1402 if (l.compareTo(endLocation) > 0) { 1403 throw new IllegalArgumentException( 1404 "Attempt to seek after the end location."); 1405 } 1406 1407 if (l.compareTo(endLocation) == 0) { 1408 parkCursorAtEnd(); 1409 return; 1410 } 1411 1412 if (l.getBlockIndex() != currentLocation.getBlockIndex()) { 1413 // going to a totally different block 1414 initBlock(l.getBlockIndex()); 1415 } else { 1416 if (valueChecked) { 1417 // may temporarily go beyond the last record in the block (in which 1418 // case the next if loop will always be true). 1419 inBlockAdvance(1); 1420 } 1421 if (l.getRecordIndex() < currentLocation.getRecordIndex()) { 1422 initBlock(l.getBlockIndex()); 1423 } 1424 } 1425 1426 inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex()); 1427 1428 return; 1429 } 1430 1431 /** 1432 * Rewind to the first entry in the scanner. The entry returned by the 1433 * previous entry() call will be invalid. 1434 * 1435 * @throws IOException 1436 */ 1437 public void rewind() throws IOException { 1438 seekTo(beginLocation); 1439 } 1440 1441 /** 1442 * Seek to the end of the scanner. The entry returned by the previous 1443 * entry() call will be invalid. 1444 * 1445 * @throws IOException 1446 */ 1447 public void seekToEnd() throws IOException { 1448 parkCursorAtEnd(); 1449 } 1450 1451 /** 1452 * Move the cursor to the first entry whose key is greater than or equal 1453 * to the input key. Synonymous to lowerBound(key, 0, key.length). The 1454 * entry returned by the previous entry() call will be invalid. 1455 * 1456 * @param key 1457 * The input key 1458 * @throws IOException 1459 */ 1460 public void lowerBound(byte[] key) throws IOException { 1461 lowerBound(key, 0, key.length); 1462 } 1463 1464 /** 1465 * Move the cursor to the first entry whose key is greater than or equal 1466 * to the input key. The entry returned by the previous entry() call will 1467 * be invalid. 1468 * 1469 * @param key 1470 * The input key 1471 * @param keyOffset 1472 * offset in the key buffer. 1473 * @param keyLen 1474 * key buffer length. 1475 * @throws IOException 1476 */ 1477 public void lowerBound(byte[] key, int keyOffset, int keyLen) 1478 throws IOException { 1479 seekTo(new ByteArray(key, keyOffset, keyLen), false); 1480 } 1481 1482 /** 1483 * Move the cursor to the first entry whose key is strictly greater than 1484 * the input key. Synonymous to upperBound(key, 0, key.length). The entry 1485 * returned by the previous entry() call will be invalid. 1486 * 1487 * @param key 1488 * The input key 1489 * @throws IOException 1490 */ 1491 public void upperBound(byte[] key) throws IOException { 1492 upperBound(key, 0, key.length); 1493 } 1494 1495 /** 1496 * Move the cursor to the first entry whose key is strictly greater than 1497 * the input key. The entry returned by the previous entry() call will be 1498 * invalid. 1499 * 1500 * @param key 1501 * The input key 1502 * @param keyOffset 1503 * offset in the key buffer. 1504 * @param keyLen 1505 * key buffer length. 1506 * @throws IOException 1507 */ 1508 public void upperBound(byte[] key, int keyOffset, int keyLen) 1509 throws IOException { 1510 seekTo(new ByteArray(key, keyOffset, keyLen), true); 1511 } 1512 1513 /** 1514 * Move the cursor to the next key-value pair. The entry returned by the 1515 * previous entry() call will be invalid. 1516 * 1517 * @return true if the cursor successfully moves. False when cursor is 1518 * already at the end location and cannot be advanced. 1519 * @throws IOException 1520 */ 1521 public boolean advance() throws IOException { 1522 if (atEnd()) { 1523 return false; 1524 } 1525 1526 int curBid = currentLocation.getBlockIndex(); 1527 long curRid = currentLocation.getRecordIndex(); 1528 long entriesInBlock = reader.getBlockEntryCount(curBid); 1529 if (curRid + 1 >= entriesInBlock) { 1530 if (endLocation.compareTo(curBid + 1, 0) <= 0) { 1531 // last entry in TFile. 1532 parkCursorAtEnd(); 1533 } else { 1534 // last entry in Block. 1535 initBlock(curBid + 1); 1536 } 1537 } else { 1538 inBlockAdvance(1); 1539 } 1540 return true; 1541 } 1542 1543 /** 1544 * Load a compressed block for reading. Expecting blockIndex is valid. 1545 * 1546 * @throws IOException 1547 */ 1548 private void initBlock(int blockIndex) throws IOException { 1549 klen = -1; 1550 if (blkReader != null) { 1551 try { 1552 blkReader.close(); 1553 } finally { 1554 blkReader = null; 1555 } 1556 } 1557 blkReader = reader.getBlockReader(blockIndex); 1558 currentLocation.set(blockIndex, 0); 1559 } 1560 1561 private void parkCursorAtEnd() throws IOException { 1562 klen = -1; 1563 currentLocation.set(endLocation); 1564 if (blkReader != null) { 1565 try { 1566 blkReader.close(); 1567 } finally { 1568 blkReader = null; 1569 } 1570 } 1571 } 1572 1573 /** 1574 * Close the scanner. Release all resources. The behavior of using the 1575 * scanner after calling close is not defined. The entry returned by the 1576 * previous entry() call will be invalid. 1577 */ 1578 @Override 1579 public void close() throws IOException { 1580 parkCursorAtEnd(); 1581 } 1582 1583 /** 1584 * Is cursor at the end location? 1585 * 1586 * @return true if the cursor is at the end location. 1587 */ 1588 public boolean atEnd() { 1589 return (currentLocation.compareTo(endLocation) >= 0); 1590 } 1591 1592 /** 1593 * check whether we have already successfully obtained the key. It also 1594 * initializes the valueInputStream. 1595 */ 1596 void checkKey() throws IOException { 1597 if (klen >= 0) return; 1598 if (atEnd()) { 1599 throw new EOFException("No key-value to read"); 1600 } 1601 klen = -1; 1602 vlen = -1; 1603 valueChecked = false; 1604 1605 klen = Utils.readVInt(blkReader); 1606 blkReader.readFully(keyBuffer, 0, klen); 1607 valueBufferInputStream.reset(blkReader); 1608 if (valueBufferInputStream.isLastChunk()) { 1609 vlen = valueBufferInputStream.getRemain(); 1610 } 1611 } 1612 1613 /** 1614 * Get an entry to access the key and value. 1615 * 1616 * @return The Entry object to access the key and value. 1617 * @throws IOException 1618 */ 1619 public Entry entry() throws IOException { 1620 checkKey(); 1621 return new Entry(); 1622 } 1623 1624 /** 1625 * Get the RecordNum corresponding to the entry pointed by the cursor. 1626 * @return The RecordNum corresponding to the entry pointed by the cursor. 1627 * @throws IOException 1628 */ 1629 public long getRecordNum() throws IOException { 1630 return reader.getRecordNumByLocation(currentLocation); 1631 } 1632 1633 /** 1634 * Internal API. Comparing the key at cursor to user-specified key. 1635 * 1636 * @param other 1637 * user-specified key. 1638 * @return negative if key at cursor is smaller than user key; 0 if equal; 1639 * and positive if key at cursor greater than user key. 1640 * @throws IOException 1641 */ 1642 int compareCursorKeyTo(RawComparable other) throws IOException { 1643 checkKey(); 1644 return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other 1645 .offset(), other.size()); 1646 } 1647 1648 /** 1649 * Entry to a <Key, Value> pair. 1650 */ 1651 public class Entry implements Comparable<RawComparable> { 1652 /** 1653 * Get the length of the key. 1654 * 1655 * @return the length of the key. 1656 */ 1657 public int getKeyLength() { 1658 return klen; 1659 } 1660 1661 byte[] getKeyBuffer() { 1662 return keyBuffer; 1663 } 1664 1665 /** 1666 * Copy the key and value in one shot into BytesWritables. This is 1667 * equivalent to getKey(key); getValue(value); 1668 * 1669 * @param key 1670 * BytesWritable to hold key. 1671 * @param value 1672 * BytesWritable to hold value 1673 * @throws IOException 1674 */ 1675 public void get(BytesWritable key, BytesWritable value) 1676 throws IOException { 1677 getKey(key); 1678 getValue(value); 1679 } 1680 1681 /** 1682 * Copy the key into BytesWritable. The input BytesWritable will be 1683 * automatically resized to the actual key size. 1684 * 1685 * @param key 1686 * BytesWritable to hold the key. 1687 * @throws IOException 1688 */ 1689 public int getKey(BytesWritable key) throws IOException { 1690 key.setSize(getKeyLength()); 1691 getKey(key.getBytes()); 1692 return key.getLength(); 1693 } 1694 1695 /** 1696 * Copy the value into BytesWritable. The input BytesWritable will be 1697 * automatically resized to the actual value size. The implementation 1698 * directly uses the buffer inside BytesWritable for storing the value. 1699 * The call does not require the value length to be known. 1700 * 1701 * @param value 1702 * @throws IOException 1703 */ 1704 public long getValue(BytesWritable value) throws IOException { 1705 DataInputStream dis = getValueStream(); 1706 int size = 0; 1707 try { 1708 int remain; 1709 while ((remain = valueBufferInputStream.getRemain()) > 0) { 1710 value.setSize(size + remain); 1711 dis.readFully(value.getBytes(), size, remain); 1712 size += remain; 1713 } 1714 return value.getLength(); 1715 } finally { 1716 dis.close(); 1717 } 1718 } 1719 1720 /** 1721 * Writing the key to the output stream. This method avoids copying key 1722 * buffer from Scanner into user buffer, then writing to the output 1723 * stream. 1724 * 1725 * @param out 1726 * The output stream 1727 * @return the length of the key. 1728 * @throws IOException 1729 */ 1730 public int writeKey(OutputStream out) throws IOException { 1731 out.write(keyBuffer, 0, klen); 1732 return klen; 1733 } 1734 1735 /** 1736 * Writing the value to the output stream. This method avoids copying 1737 * value data from Scanner into user buffer, then writing to the output 1738 * stream. It does not require the value length to be known. 1739 * 1740 * @param out 1741 * The output stream 1742 * @return the length of the value 1743 * @throws IOException 1744 */ 1745 public long writeValue(OutputStream out) throws IOException { 1746 DataInputStream dis = getValueStream(); 1747 long size = 0; 1748 try { 1749 int chunkSize; 1750 while ((chunkSize = valueBufferInputStream.getRemain()) > 0) { 1751 chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE); 1752 valTransferBuffer.setSize(chunkSize); 1753 dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize); 1754 out.write(valTransferBuffer.getBytes(), 0, chunkSize); 1755 size += chunkSize; 1756 } 1757 return size; 1758 } finally { 1759 dis.close(); 1760 } 1761 } 1762 1763 /** 1764 * Copy the key into user supplied buffer. 1765 * 1766 * @param buf 1767 * The buffer supplied by user. The length of the buffer must 1768 * not be shorter than the key length. 1769 * @return The length of the key. 1770 * 1771 * @throws IOException 1772 */ 1773 public int getKey(byte[] buf) throws IOException { 1774 return getKey(buf, 0); 1775 } 1776 1777 /** 1778 * Copy the key into user supplied buffer. 1779 * 1780 * @param buf 1781 * The buffer supplied by user. 1782 * @param offset 1783 * The starting offset of the user buffer where we should copy 1784 * the key into. Requiring the key-length + offset no greater 1785 * than the buffer length. 1786 * @return The length of the key. 1787 * @throws IOException 1788 */ 1789 public int getKey(byte[] buf, int offset) throws IOException { 1790 if ((offset | (buf.length - offset - klen)) < 0) { 1791 throw new IndexOutOfBoundsException( 1792 "Bufer not enough to store the key"); 1793 } 1794 System.arraycopy(keyBuffer, 0, buf, offset, klen); 1795 return klen; 1796 } 1797 1798 /** 1799 * Streaming access to the key. Useful for desrializing the key into 1800 * user objects. 1801 * 1802 * @return The input stream. 1803 */ 1804 public DataInputStream getKeyStream() { 1805 keyDataInputStream.reset(keyBuffer, klen); 1806 return keyDataInputStream; 1807 } 1808 1809 /** 1810 * Get the length of the value. isValueLengthKnown() must be tested 1811 * true. 1812 * 1813 * @return the length of the value. 1814 */ 1815 public int getValueLength() { 1816 if (vlen >= 0) { 1817 return vlen; 1818 } 1819 1820 throw new RuntimeException("Value length unknown."); 1821 } 1822 1823 /** 1824 * Copy value into user-supplied buffer. User supplied buffer must be 1825 * large enough to hold the whole value. The value part of the key-value 1826 * pair pointed by the current cursor is not cached and can only be 1827 * examined once. Calling any of the following functions more than once 1828 * without moving the cursor will result in exception: 1829 * {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, 1830 * {@link #getValueStream}. 1831 * 1832 * @return the length of the value. Does not require 1833 * isValueLengthKnown() to be true. 1834 * @throws IOException 1835 * 1836 */ 1837 public int getValue(byte[] buf) throws IOException { 1838 return getValue(buf, 0); 1839 } 1840 1841 /** 1842 * Copy value into user-supplied buffer. User supplied buffer must be 1843 * large enough to hold the whole value (starting from the offset). The 1844 * value part of the key-value pair pointed by the current cursor is not 1845 * cached and can only be examined once. Calling any of the following 1846 * functions more than once without moving the cursor will result in 1847 * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, 1848 * {@link #getValueStream}. 1849 * 1850 * @return the length of the value. Does not require 1851 * isValueLengthKnown() to be true. 1852 * @throws IOException 1853 */ 1854 public int getValue(byte[] buf, int offset) throws IOException { 1855 DataInputStream dis = getValueStream(); 1856 try { 1857 if (isValueLengthKnown()) { 1858 if ((offset | (buf.length - offset - vlen)) < 0) { 1859 throw new IndexOutOfBoundsException( 1860 "Buffer too small to hold value"); 1861 } 1862 dis.readFully(buf, offset, vlen); 1863 return vlen; 1864 } 1865 1866 int nextOffset = offset; 1867 while (nextOffset < buf.length) { 1868 int n = dis.read(buf, nextOffset, buf.length - nextOffset); 1869 if (n < 0) { 1870 break; 1871 } 1872 nextOffset += n; 1873 } 1874 if (dis.read() >= 0) { 1875 // attempt to read one more byte to determine whether we reached 1876 // the 1877 // end or not. 1878 throw new IndexOutOfBoundsException( 1879 "Buffer too small to hold value"); 1880 } 1881 return nextOffset - offset; 1882 } finally { 1883 dis.close(); 1884 } 1885 } 1886 1887 /** 1888 * Stream access to value. The value part of the key-value pair pointed 1889 * by the current cursor is not cached and can only be examined once. 1890 * Calling any of the following functions more than once without moving 1891 * the cursor will result in exception: {@link #getValue(byte[])}, 1892 * {@link #getValue(byte[], int)}, {@link #getValueStream}. 1893 * 1894 * @return The input stream for reading the value. 1895 * @throws IOException 1896 */ 1897 public DataInputStream getValueStream() throws IOException { 1898 if (valueChecked == true) { 1899 throw new IllegalStateException( 1900 "Attempt to examine value multiple times."); 1901 } 1902 valueChecked = true; 1903 return valueDataInputStream; 1904 } 1905 1906 /** 1907 * Check whether it is safe to call getValueLength(). 1908 * 1909 * @return true if value length is known before hand. Values less than 1910 * the chunk size will always have their lengths known before 1911 * hand. Values that are written out as a whole (with advertised 1912 * length up-front) will always have their lengths known in 1913 * read. 1914 */ 1915 public boolean isValueLengthKnown() { 1916 return (vlen >= 0); 1917 } 1918 1919 /** 1920 * Compare the entry key to another key. Synonymous to compareTo(key, 0, 1921 * key.length). 1922 * 1923 * @param buf 1924 * The key buffer. 1925 * @return comparison result between the entry key with the input key. 1926 */ 1927 public int compareTo(byte[] buf) { 1928 return compareTo(buf, 0, buf.length); 1929 } 1930 1931 /** 1932 * Compare the entry key to another key. Synonymous to compareTo(new 1933 * ByteArray(buf, offset, length) 1934 * 1935 * @param buf 1936 * The key buffer 1937 * @param offset 1938 * offset into the key buffer. 1939 * @param length 1940 * the length of the key. 1941 * @return comparison result between the entry key with the input key. 1942 */ 1943 public int compareTo(byte[] buf, int offset, int length) { 1944 return compareTo(new ByteArray(buf, offset, length)); 1945 } 1946 1947 /** 1948 * Compare an entry with a RawComparable object. This is useful when 1949 * Entries are stored in a collection, and we want to compare a user 1950 * supplied key. 1951 */ 1952 @Override 1953 public int compareTo(RawComparable key) { 1954 return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(), 1955 key.offset(), key.size()); 1956 } 1957 1958 /** 1959 * Compare whether this and other points to the same key value. 1960 */ 1961 @Override 1962 public boolean equals(Object other) { 1963 if (this == other) return true; 1964 if (!(other instanceof Entry)) return false; 1965 return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0; 1966 } 1967 1968 @Override 1969 public int hashCode() { 1970 return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength()); 1971 } 1972 } 1973 1974 /** 1975 * Advance cursor by n positions within the block. 1976 * 1977 * @param n 1978 * Number of key-value pairs to skip in block. 1979 * @throws IOException 1980 */ 1981 private void inBlockAdvance(long n) throws IOException { 1982 for (long i = 0; i < n; ++i) { 1983 checkKey(); 1984 if (!valueBufferInputStream.isClosed()) { 1985 valueBufferInputStream.close(); 1986 } 1987 klen = -1; 1988 currentLocation.incRecordIndex(); 1989 } 1990 } 1991 1992 /** 1993 * Advance cursor in block until we find a key that is greater than or 1994 * equal to the input key. 1995 * 1996 * @param key 1997 * Key to compare. 1998 * @param greater 1999 * advance until we find a key greater than the input key. 2000 * @return true if we find a equal key. 2001 * @throws IOException 2002 */ 2003 private boolean inBlockAdvance(RawComparable key, boolean greater) 2004 throws IOException { 2005 int curBid = currentLocation.getBlockIndex(); 2006 long entryInBlock = reader.getBlockEntryCount(curBid); 2007 if (curBid == endLocation.getBlockIndex()) { 2008 entryInBlock = endLocation.getRecordIndex(); 2009 } 2010 2011 while (currentLocation.getRecordIndex() < entryInBlock) { 2012 int cmp = compareCursorKeyTo(key); 2013 if (cmp > 0) return false; 2014 if (cmp == 0 && !greater) return true; 2015 if (!valueBufferInputStream.isClosed()) { 2016 valueBufferInputStream.close(); 2017 } 2018 klen = -1; 2019 currentLocation.incRecordIndex(); 2020 } 2021 2022 throw new RuntimeException("Cannot find matching key in block."); 2023 } 2024 } 2025 2026 long getBlockEntryCount(int curBid) { 2027 return tfileIndex.getEntry(curBid).entries(); 2028 } 2029 2030 BlockReader getBlockReader(int blockIndex) throws IOException { 2031 return readerBCF.getDataBlock(blockIndex); 2032 } 2033 } 2034 2035 /** 2036 * Data structure representing "TFile.meta" meta block. 2037 */ 2038 static final class TFileMeta { 2039 final static String BLOCK_NAME = "TFile.meta"; 2040 final Version version; 2041 private long recordCount; 2042 private final String strComparator; 2043 private final BytesComparator comparator; 2044 2045 // ctor for writes 2046 public TFileMeta(String comparator) { 2047 // set fileVersion to API version when we create it. 2048 version = TFile.API_VERSION; 2049 recordCount = 0; 2050 strComparator = (comparator == null) ? "" : comparator; 2051 this.comparator = makeComparator(strComparator); 2052 } 2053 2054 // ctor for reads 2055 public TFileMeta(DataInput in) throws IOException { 2056 version = new Version(in); 2057 if (!version.compatibleWith(TFile.API_VERSION)) { 2058 throw new RuntimeException("Incompatible TFile fileVersion."); 2059 } 2060 recordCount = Utils.readVLong(in); 2061 strComparator = Utils.readString(in); 2062 comparator = makeComparator(strComparator); 2063 } 2064 2065 @SuppressWarnings("unchecked") 2066 static BytesComparator makeComparator(String comparator) { 2067 if (comparator.length() == 0) { 2068 // unsorted keys 2069 return null; 2070 } 2071 if (comparator.equals(COMPARATOR_MEMCMP)) { 2072 // default comparator 2073 return new BytesComparator(new MemcmpRawComparator()); 2074 } else if (comparator.startsWith(COMPARATOR_JCLASS)) { 2075 String compClassName = 2076 comparator.substring(COMPARATOR_JCLASS.length()).trim(); 2077 try { 2078 Class compClass = Class.forName(compClassName); 2079 // use its default ctor to create an instance 2080 return new BytesComparator((RawComparator<Object>) compClass 2081 .newInstance()); 2082 } catch (Exception e) { 2083 throw new IllegalArgumentException( 2084 "Failed to instantiate comparator: " + comparator + "(" 2085 + e.toString() + ")"); 2086 } 2087 } else { 2088 throw new IllegalArgumentException("Unsupported comparator: " 2089 + comparator); 2090 } 2091 } 2092 2093 public void write(DataOutput out) throws IOException { 2094 TFile.API_VERSION.write(out); 2095 Utils.writeVLong(out, recordCount); 2096 Utils.writeString(out, strComparator); 2097 } 2098 2099 public long getRecordCount() { 2100 return recordCount; 2101 } 2102 2103 public void incRecordCount() { 2104 ++recordCount; 2105 } 2106 2107 public boolean isSorted() { 2108 return !strComparator.isEmpty(); 2109 } 2110 2111 public String getComparatorString() { 2112 return strComparator; 2113 } 2114 2115 public BytesComparator getComparator() { 2116 return comparator; 2117 } 2118 2119 public Version getVersion() { 2120 return version; 2121 } 2122 } // END: class MetaTFileMeta 2123 2124 /** 2125 * Data structure representing "TFile.index" meta block. 2126 */ 2127 static class TFileIndex { 2128 final static String BLOCK_NAME = "TFile.index"; 2129 private ByteArray firstKey; 2130 private final ArrayList<TFileIndexEntry> index; 2131 private final ArrayList<Long> recordNumIndex; 2132 private final BytesComparator comparator; 2133 private long sum = 0; 2134 2135 /** 2136 * For reading from file. 2137 * 2138 * @throws IOException 2139 */ 2140 public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) 2141 throws IOException { 2142 index = new ArrayList<TFileIndexEntry>(entryCount); 2143 recordNumIndex = new ArrayList<Long>(entryCount); 2144 int size = Utils.readVInt(in); // size for the first key entry. 2145 if (size > 0) { 2146 byte[] buffer = new byte[size]; 2147 in.readFully(buffer); 2148 DataInputStream firstKeyInputStream = 2149 new DataInputStream(new ByteArrayInputStream(buffer, 0, size)); 2150 2151 int firstKeyLength = Utils.readVInt(firstKeyInputStream); 2152 firstKey = new ByteArray(new byte[firstKeyLength]); 2153 firstKeyInputStream.readFully(firstKey.buffer()); 2154 2155 for (int i = 0; i < entryCount; i++) { 2156 size = Utils.readVInt(in); 2157 if (buffer.length < size) { 2158 buffer = new byte[size]; 2159 } 2160 in.readFully(buffer, 0, size); 2161 TFileIndexEntry idx = 2162 new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream( 2163 buffer, 0, size))); 2164 index.add(idx); 2165 sum += idx.entries(); 2166 recordNumIndex.add(sum); 2167 } 2168 } else { 2169 if (entryCount != 0) { 2170 throw new RuntimeException("Internal error"); 2171 } 2172 } 2173 this.comparator = comparator; 2174 } 2175 2176 /** 2177 * @param key 2178 * input key. 2179 * @return the ID of the first block that contains key >= input key. Or -1 2180 * if no such block exists. 2181 */ 2182 public int lowerBound(RawComparable key) { 2183 if (comparator == null) { 2184 throw new RuntimeException("Cannot search in unsorted TFile"); 2185 } 2186 2187 if (firstKey == null) { 2188 return -1; // not found 2189 } 2190 2191 int ret = Utils.lowerBound(index, key, comparator); 2192 if (ret == index.size()) { 2193 return -1; 2194 } 2195 return ret; 2196 } 2197 2198 /** 2199 * @param key 2200 * input key. 2201 * @return the ID of the first block that contains key > input key. Or -1 2202 * if no such block exists. 2203 */ 2204 public int upperBound(RawComparable key) { 2205 if (comparator == null) { 2206 throw new RuntimeException("Cannot search in unsorted TFile"); 2207 } 2208 2209 if (firstKey == null) { 2210 return -1; // not found 2211 } 2212 2213 int ret = Utils.upperBound(index, key, comparator); 2214 if (ret == index.size()) { 2215 return -1; 2216 } 2217 return ret; 2218 } 2219 2220 /** 2221 * For writing to file. 2222 */ 2223 public TFileIndex(BytesComparator comparator) { 2224 index = new ArrayList<TFileIndexEntry>(); 2225 recordNumIndex = new ArrayList<Long>(); 2226 this.comparator = comparator; 2227 } 2228 2229 public RawComparable getFirstKey() { 2230 return firstKey; 2231 } 2232 2233 public Reader.Location getLocationByRecordNum(long recNum) { 2234 int idx = Utils.upperBound(recordNumIndex, recNum); 2235 long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1); 2236 return new Reader.Location(idx, recNum-lastRecNum); 2237 } 2238 2239 public long getRecordNumByLocation(Reader.Location location) { 2240 int blkIndex = location.getBlockIndex(); 2241 long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1); 2242 return lastRecNum + location.getRecordIndex(); 2243 } 2244 2245 public void setFirstKey(byte[] key, int offset, int length) { 2246 firstKey = new ByteArray(new byte[length]); 2247 System.arraycopy(key, offset, firstKey.buffer(), 0, length); 2248 } 2249 2250 public RawComparable getLastKey() { 2251 if (index.size() == 0) { 2252 return null; 2253 } 2254 return new ByteArray(index.get(index.size() - 1).buffer()); 2255 } 2256 2257 public void addEntry(TFileIndexEntry keyEntry) { 2258 index.add(keyEntry); 2259 sum += keyEntry.entries(); 2260 recordNumIndex.add(sum); 2261 } 2262 2263 public TFileIndexEntry getEntry(int bid) { 2264 return index.get(bid); 2265 } 2266 2267 public void write(DataOutput out) throws IOException { 2268 if (firstKey == null) { 2269 Utils.writeVInt(out, 0); 2270 return; 2271 } 2272 2273 DataOutputBuffer dob = new DataOutputBuffer(); 2274 Utils.writeVInt(dob, firstKey.size()); 2275 dob.write(firstKey.buffer()); 2276 Utils.writeVInt(out, dob.size()); 2277 out.write(dob.getData(), 0, dob.getLength()); 2278 2279 for (TFileIndexEntry entry : index) { 2280 dob.reset(); 2281 entry.write(dob); 2282 Utils.writeVInt(out, dob.getLength()); 2283 out.write(dob.getData(), 0, dob.getLength()); 2284 } 2285 } 2286 } 2287 2288 /** 2289 * TFile Data Index entry. We should try to make the memory footprint of each 2290 * index entry as small as possible. 2291 */ 2292 static final class TFileIndexEntry implements RawComparable { 2293 final byte[] key; 2294 // count of <key, value> entries in the block. 2295 final long kvEntries; 2296 2297 public TFileIndexEntry(DataInput in) throws IOException { 2298 int len = Utils.readVInt(in); 2299 key = new byte[len]; 2300 in.readFully(key, 0, len); 2301 kvEntries = Utils.readVLong(in); 2302 } 2303 2304 // default entry, without any padding 2305 public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) { 2306 key = new byte[len]; 2307 System.arraycopy(newkey, offset, key, 0, len); 2308 this.kvEntries = entries; 2309 } 2310 2311 @Override 2312 public byte[] buffer() { 2313 return key; 2314 } 2315 2316 @Override 2317 public int offset() { 2318 return 0; 2319 } 2320 2321 @Override 2322 public int size() { 2323 return key.length; 2324 } 2325 2326 long entries() { 2327 return kvEntries; 2328 } 2329 2330 public void write(DataOutput out) throws IOException { 2331 Utils.writeVInt(out, key.length); 2332 out.write(key, 0, key.length); 2333 Utils.writeVLong(out, kvEntries); 2334 } 2335 } 2336 2337 /** 2338 * Dumping the TFile information. 2339 * 2340 * @param args 2341 * A list of TFile paths. 2342 */ 2343 public static void main(String[] args) { 2344 System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION 2345 .toString(), BCFile.API_VERSION.toString()); 2346 if (args.length == 0) { 2347 System.out 2348 .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]"); 2349 System.exit(0); 2350 } 2351 Configuration conf = new Configuration(); 2352 2353 for (String file : args) { 2354 System.out.println("===" + file + "==="); 2355 try { 2356 TFileDumper.dumpInfo(file, System.out, conf); 2357 } catch (IOException e) { 2358 e.printStackTrace(System.err); 2359 } 2360 } 2361 } 2362 }