001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025 026import org.apache.commons.io.Charsets; 027import org.apache.commons.logging.*; 028import org.apache.hadoop.util.Options; 029import org.apache.hadoop.fs.*; 030import org.apache.hadoop.fs.Options.CreateOpts; 031import org.apache.hadoop.io.compress.CodecPool; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.hadoop.io.compress.DefaultCodec; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.io.compress.zlib.ZlibFactory; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.Serializer; 042import org.apache.hadoop.io.serializer.SerializationFactory; 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.*; 046import org.apache.hadoop.util.Progressable; 047import org.apache.hadoop.util.Progress; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.NativeCodeLoader; 050import org.apache.hadoop.util.MergeSort; 051import org.apache.hadoop.util.PriorityQueue; 052import org.apache.hadoop.util.Time; 053 054/** 055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 056 * pairs. 057 * 058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 060 * reading and sorting respectively.</p> 061 * 062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 063 * {@link CompressionType} used to compress key/value pairs: 064 * <ol> 065 * <li> 066 * <code>Writer</code> : Uncompressed records. 067 * </li> 068 * <li> 069 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 070 * values. 071 * </li> 072 * <li> 073 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 074 * values are collected in 'blocks' 075 * separately and compressed. The size of 076 * the 'block' is configurable. 077 * </ol> 078 * 079 * <p>The actual compression algorithm used to compress key and/or values can be 080 * specified by using the appropriate {@link CompressionCodec}.</p> 081 * 082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 084 * 085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 086 * above <code>SequenceFile</code> formats.</p> 087 * 088 * <h4 id="Formats">SequenceFile Formats</h4> 089 * 090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 091 * depending on the <code>CompressionType</code> specified. All of them share a 092 * <a href="#Header">common header</a> described below. 093 * 094 * <h5 id="Header">SequenceFile Header</h5> 095 * <ul> 096 * <li> 097 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 098 * version number (e.g. SEQ4 or SEQ6) 099 * </li> 100 * <li> 101 * keyClassName -key class 102 * </li> 103 * <li> 104 * valueClassName - value class 105 * </li> 106 * <li> 107 * compression - A boolean which specifies if compression is turned on for 108 * keys/values in this file. 109 * </li> 110 * <li> 111 * blockCompression - A boolean which specifies if block-compression is 112 * turned on for keys/values in this file. 113 * </li> 114 * <li> 115 * compression codec - <code>CompressionCodec</code> class which is used for 116 * compression of keys and/or values (if compression is 117 * enabled). 118 * </li> 119 * <li> 120 * metadata - {@link Metadata} for this file. 121 * </li> 122 * <li> 123 * sync - A sync marker to denote end of the header. 124 * </li> 125 * </ul> 126 * 127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 128 * <ul> 129 * <li> 130 * <a href="#Header">Header</a> 131 * </li> 132 * <li> 133 * Record 134 * <ul> 135 * <li>Record length</li> 136 * <li>Key length</li> 137 * <li>Key</li> 138 * <li>Value</li> 139 * </ul> 140 * </li> 141 * <li> 142 * A sync-marker every few <code>100</code> bytes or so. 143 * </li> 144 * </ul> 145 * 146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 147 * <ul> 148 * <li> 149 * <a href="#Header">Header</a> 150 * </li> 151 * <li> 152 * Record 153 * <ul> 154 * <li>Record length</li> 155 * <li>Key length</li> 156 * <li>Key</li> 157 * <li><i>Compressed</i> Value</li> 158 * </ul> 159 * </li> 160 * <li> 161 * A sync-marker every few <code>100</code> bytes or so. 162 * </li> 163 * </ul> 164 * 165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 166 * <ul> 167 * <li> 168 * <a href="#Header">Header</a> 169 * </li> 170 * <li> 171 * Record <i>Block</i> 172 * <ul> 173 * <li>Uncompressed number of records in the block</li> 174 * <li>Compressed key-lengths block-size</li> 175 * <li>Compressed key-lengths block</li> 176 * <li>Compressed keys block-size</li> 177 * <li>Compressed keys block</li> 178 * <li>Compressed value-lengths block-size</li> 179 * <li>Compressed value-lengths block</li> 180 * <li>Compressed values block-size</li> 181 * <li>Compressed values block</li> 182 * </ul> 183 * </li> 184 * <li> 185 * A sync-marker every block. 186 * </li> 187 * </ul> 188 * 189 * <p>The compressed blocks of key lengths and value lengths consist of the 190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 191 * format.</p> 192 * 193 * @see CompressionCodec 194 */ 195@InterfaceAudience.Public 196@InterfaceStability.Stable 197public class SequenceFile { 198 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 199 200 private SequenceFile() {} // no public ctor 201 202 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 203 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 204 private static final byte VERSION_WITH_METADATA = (byte)6; 205 private static byte[] VERSION = new byte[] { 206 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 207 }; 208 209 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 210 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 211 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 212 213 /** The number of bytes between sync points.*/ 214 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 215 216 /** 217 * The compression type used to compress key/value pairs in the 218 * {@link SequenceFile}. 219 * 220 * @see SequenceFile.Writer 221 */ 222 public static enum CompressionType { 223 /** Do not compress records. */ 224 NONE, 225 /** Compress values only, each separately. */ 226 RECORD, 227 /** Compress sequences of records together in blocks. */ 228 BLOCK 229 } 230 231 /** 232 * Get the compression type for the reduce outputs 233 * @param job the job config to look in 234 * @return the kind of compression to use 235 */ 236 static public CompressionType getDefaultCompressionType(Configuration job) { 237 String name = job.get("io.seqfile.compression.type"); 238 return name == null ? CompressionType.RECORD : 239 CompressionType.valueOf(name); 240 } 241 242 /** 243 * Set the default compression type for sequence files. 244 * @param job the configuration to modify 245 * @param val the new compression type (none, block, record) 246 */ 247 static public void setDefaultCompressionType(Configuration job, 248 CompressionType val) { 249 job.set("io.seqfile.compression.type", val.toString()); 250 } 251 252 /** 253 * Create a new Writer with the given options. 254 * @param conf the configuration to use 255 * @param opts the options to create the file with 256 * @return a new Writer 257 * @throws IOException 258 */ 259 public static Writer createWriter(Configuration conf, Writer.Option... opts 260 ) throws IOException { 261 Writer.CompressionOption compressionOption = 262 Options.getOption(Writer.CompressionOption.class, opts); 263 CompressionType kind; 264 if (compressionOption != null) { 265 kind = compressionOption.getValue(); 266 } else { 267 kind = getDefaultCompressionType(conf); 268 opts = Options.prependOptions(opts, Writer.compression(kind)); 269 } 270 switch (kind) { 271 default: 272 case NONE: 273 return new Writer(conf, opts); 274 case RECORD: 275 return new RecordCompressWriter(conf, opts); 276 case BLOCK: 277 return new BlockCompressWriter(conf, opts); 278 } 279 } 280 281 /** 282 * Construct the preferred type of SequenceFile Writer. 283 * @param fs The configured filesystem. 284 * @param conf The configuration. 285 * @param name The name of the file. 286 * @param keyClass The 'key' type. 287 * @param valClass The 'value' type. 288 * @return Returns the handle to the constructed SequenceFile Writer. 289 * @throws IOException 290 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 291 * instead. 292 */ 293 @Deprecated 294 public static Writer 295 createWriter(FileSystem fs, Configuration conf, Path name, 296 Class keyClass, Class valClass) throws IOException { 297 return createWriter(conf, Writer.filesystem(fs), 298 Writer.file(name), Writer.keyClass(keyClass), 299 Writer.valueClass(valClass)); 300 } 301 302 /** 303 * Construct the preferred type of SequenceFile Writer. 304 * @param fs The configured filesystem. 305 * @param conf The configuration. 306 * @param name The name of the file. 307 * @param keyClass The 'key' type. 308 * @param valClass The 'value' type. 309 * @param compressionType The compression type. 310 * @return Returns the handle to the constructed SequenceFile Writer. 311 * @throws IOException 312 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 313 * instead. 314 */ 315 @Deprecated 316 public static Writer 317 createWriter(FileSystem fs, Configuration conf, Path name, 318 Class keyClass, Class valClass, 319 CompressionType compressionType) throws IOException { 320 return createWriter(conf, Writer.filesystem(fs), 321 Writer.file(name), Writer.keyClass(keyClass), 322 Writer.valueClass(valClass), 323 Writer.compression(compressionType)); 324 } 325 326 /** 327 * Construct the preferred type of SequenceFile Writer. 328 * @param fs The configured filesystem. 329 * @param conf The configuration. 330 * @param name The name of the file. 331 * @param keyClass The 'key' type. 332 * @param valClass The 'value' type. 333 * @param compressionType The compression type. 334 * @param progress The Progressable object to track progress. 335 * @return Returns the handle to the constructed SequenceFile Writer. 336 * @throws IOException 337 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 338 * instead. 339 */ 340 @Deprecated 341 public static Writer 342 createWriter(FileSystem fs, Configuration conf, Path name, 343 Class keyClass, Class valClass, CompressionType compressionType, 344 Progressable progress) throws IOException { 345 return createWriter(conf, Writer.file(name), 346 Writer.filesystem(fs), 347 Writer.keyClass(keyClass), 348 Writer.valueClass(valClass), 349 Writer.compression(compressionType), 350 Writer.progressable(progress)); 351 } 352 353 /** 354 * Construct the preferred type of SequenceFile Writer. 355 * @param fs The configured filesystem. 356 * @param conf The configuration. 357 * @param name The name of the file. 358 * @param keyClass The 'key' type. 359 * @param valClass The 'value' type. 360 * @param compressionType The compression type. 361 * @param codec The compression codec. 362 * @return Returns the handle to the constructed SequenceFile Writer. 363 * @throws IOException 364 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 365 * instead. 366 */ 367 @Deprecated 368 public static Writer 369 createWriter(FileSystem fs, Configuration conf, Path name, 370 Class keyClass, Class valClass, CompressionType compressionType, 371 CompressionCodec codec) throws IOException { 372 return createWriter(conf, Writer.file(name), 373 Writer.filesystem(fs), 374 Writer.keyClass(keyClass), 375 Writer.valueClass(valClass), 376 Writer.compression(compressionType, codec)); 377 } 378 379 /** 380 * Construct the preferred type of SequenceFile Writer. 381 * @param fs The configured filesystem. 382 * @param conf The configuration. 383 * @param name The name of the file. 384 * @param keyClass The 'key' type. 385 * @param valClass The 'value' type. 386 * @param compressionType The compression type. 387 * @param codec The compression codec. 388 * @param progress The Progressable object to track progress. 389 * @param metadata The metadata of the file. 390 * @return Returns the handle to the constructed SequenceFile Writer. 391 * @throws IOException 392 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 393 * instead. 394 */ 395 @Deprecated 396 public static Writer 397 createWriter(FileSystem fs, Configuration conf, Path name, 398 Class keyClass, Class valClass, 399 CompressionType compressionType, CompressionCodec codec, 400 Progressable progress, Metadata metadata) throws IOException { 401 return createWriter(conf, Writer.file(name), 402 Writer.filesystem(fs), 403 Writer.keyClass(keyClass), 404 Writer.valueClass(valClass), 405 Writer.compression(compressionType, codec), 406 Writer.progressable(progress), 407 Writer.metadata(metadata)); 408 } 409 410 /** 411 * Construct the preferred type of SequenceFile Writer. 412 * @param fs The configured filesystem. 413 * @param conf The configuration. 414 * @param name The name of the file. 415 * @param keyClass The 'key' type. 416 * @param valClass The 'value' type. 417 * @param bufferSize buffer size for the underlaying outputstream. 418 * @param replication replication factor for the file. 419 * @param blockSize block size for the file. 420 * @param compressionType The compression type. 421 * @param codec The compression codec. 422 * @param progress The Progressable object to track progress. 423 * @param metadata The metadata of the file. 424 * @return Returns the handle to the constructed SequenceFile Writer. 425 * @throws IOException 426 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 427 * instead. 428 */ 429 @Deprecated 430 public static Writer 431 createWriter(FileSystem fs, Configuration conf, Path name, 432 Class keyClass, Class valClass, int bufferSize, 433 short replication, long blockSize, 434 CompressionType compressionType, CompressionCodec codec, 435 Progressable progress, Metadata metadata) throws IOException { 436 return createWriter(conf, Writer.file(name), 437 Writer.filesystem(fs), 438 Writer.keyClass(keyClass), 439 Writer.valueClass(valClass), 440 Writer.bufferSize(bufferSize), 441 Writer.replication(replication), 442 Writer.blockSize(blockSize), 443 Writer.compression(compressionType, codec), 444 Writer.progressable(progress), 445 Writer.metadata(metadata)); 446 } 447 448 /** 449 * Construct the preferred type of SequenceFile Writer. 450 * @param fs The configured filesystem. 451 * @param conf The configuration. 452 * @param name The name of the file. 453 * @param keyClass The 'key' type. 454 * @param valClass The 'value' type. 455 * @param bufferSize buffer size for the underlaying outputstream. 456 * @param replication replication factor for the file. 457 * @param blockSize block size for the file. 458 * @param createParent create parent directory if non-existent 459 * @param compressionType The compression type. 460 * @param codec The compression codec. 461 * @param metadata The metadata of the file. 462 * @return Returns the handle to the constructed SequenceFile Writer. 463 * @throws IOException 464 */ 465 @Deprecated 466 public static Writer 467 createWriter(FileSystem fs, Configuration conf, Path name, 468 Class keyClass, Class valClass, int bufferSize, 469 short replication, long blockSize, boolean createParent, 470 CompressionType compressionType, CompressionCodec codec, 471 Metadata metadata) throws IOException { 472 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 473 conf, name, keyClass, valClass, compressionType, codec, 474 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 475 CreateOpts.bufferSize(bufferSize), 476 createParent ? CreateOpts.createParent() 477 : CreateOpts.donotCreateParent(), 478 CreateOpts.repFac(replication), 479 CreateOpts.blockSize(blockSize) 480 ); 481 } 482 483 /** 484 * Construct the preferred type of SequenceFile Writer. 485 * @param fc The context for the specified file. 486 * @param conf The configuration. 487 * @param name The name of the file. 488 * @param keyClass The 'key' type. 489 * @param valClass The 'value' type. 490 * @param compressionType The compression type. 491 * @param codec The compression codec. 492 * @param metadata The metadata of the file. 493 * @param createFlag gives the semantics of create: overwrite, append etc. 494 * @param opts file creation options; see {@link CreateOpts}. 495 * @return Returns the handle to the constructed SequenceFile Writer. 496 * @throws IOException 497 */ 498 public static Writer 499 createWriter(FileContext fc, Configuration conf, Path name, 500 Class keyClass, Class valClass, 501 CompressionType compressionType, CompressionCodec codec, 502 Metadata metadata, 503 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 504 throws IOException { 505 return createWriter(conf, fc.create(name, createFlag, opts), 506 keyClass, valClass, compressionType, codec, metadata).ownStream(); 507 } 508 509 /** 510 * Construct the preferred type of SequenceFile Writer. 511 * @param fs The configured filesystem. 512 * @param conf The configuration. 513 * @param name The name of the file. 514 * @param keyClass The 'key' type. 515 * @param valClass The 'value' type. 516 * @param compressionType The compression type. 517 * @param codec The compression codec. 518 * @param progress The Progressable object to track progress. 519 * @return Returns the handle to the constructed SequenceFile Writer. 520 * @throws IOException 521 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 522 * instead. 523 */ 524 @Deprecated 525 public static Writer 526 createWriter(FileSystem fs, Configuration conf, Path name, 527 Class keyClass, Class valClass, 528 CompressionType compressionType, CompressionCodec codec, 529 Progressable progress) throws IOException { 530 return createWriter(conf, Writer.file(name), 531 Writer.filesystem(fs), 532 Writer.keyClass(keyClass), 533 Writer.valueClass(valClass), 534 Writer.compression(compressionType, codec), 535 Writer.progressable(progress)); 536 } 537 538 /** 539 * Construct the preferred type of 'raw' SequenceFile Writer. 540 * @param conf The configuration. 541 * @param out The stream on top which the writer is to be constructed. 542 * @param keyClass The 'key' type. 543 * @param valClass The 'value' type. 544 * @param compressionType The compression type. 545 * @param codec The compression codec. 546 * @param metadata The metadata of the file. 547 * @return Returns the handle to the constructed SequenceFile Writer. 548 * @throws IOException 549 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 550 * instead. 551 */ 552 @Deprecated 553 public static Writer 554 createWriter(Configuration conf, FSDataOutputStream out, 555 Class keyClass, Class valClass, 556 CompressionType compressionType, 557 CompressionCodec codec, Metadata metadata) throws IOException { 558 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 559 Writer.valueClass(valClass), 560 Writer.compression(compressionType, codec), 561 Writer.metadata(metadata)); 562 } 563 564 /** 565 * Construct the preferred type of 'raw' SequenceFile Writer. 566 * @param conf The configuration. 567 * @param out The stream on top which the writer is to be constructed. 568 * @param keyClass The 'key' type. 569 * @param valClass The 'value' type. 570 * @param compressionType The compression type. 571 * @param codec The compression codec. 572 * @return Returns the handle to the constructed SequenceFile Writer. 573 * @throws IOException 574 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 575 * instead. 576 */ 577 @Deprecated 578 public static Writer 579 createWriter(Configuration conf, FSDataOutputStream out, 580 Class keyClass, Class valClass, CompressionType compressionType, 581 CompressionCodec codec) throws IOException { 582 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 583 Writer.valueClass(valClass), 584 Writer.compression(compressionType, codec)); 585 } 586 587 588 /** The interface to 'raw' values of SequenceFiles. */ 589 public static interface ValueBytes { 590 591 /** Writes the uncompressed bytes to the outStream. 592 * @param outStream : Stream to write uncompressed bytes into. 593 * @throws IOException 594 */ 595 public void writeUncompressedBytes(DataOutputStream outStream) 596 throws IOException; 597 598 /** Write compressed bytes to outStream. 599 * Note: that it will NOT compress the bytes if they are not compressed. 600 * @param outStream : Stream to write compressed bytes into. 601 */ 602 public void writeCompressedBytes(DataOutputStream outStream) 603 throws IllegalArgumentException, IOException; 604 605 /** 606 * Size of stored data. 607 */ 608 public int getSize(); 609 } 610 611 private static class UncompressedBytes implements ValueBytes { 612 private int dataSize; 613 private byte[] data; 614 615 private UncompressedBytes() { 616 data = null; 617 dataSize = 0; 618 } 619 620 private void reset(DataInputStream in, int length) throws IOException { 621 if (data == null) { 622 data = new byte[length]; 623 } else if (length > data.length) { 624 data = new byte[Math.max(length, data.length * 2)]; 625 } 626 dataSize = -1; 627 in.readFully(data, 0, length); 628 dataSize = length; 629 } 630 631 @Override 632 public int getSize() { 633 return dataSize; 634 } 635 636 @Override 637 public void writeUncompressedBytes(DataOutputStream outStream) 638 throws IOException { 639 outStream.write(data, 0, dataSize); 640 } 641 642 @Override 643 public void writeCompressedBytes(DataOutputStream outStream) 644 throws IllegalArgumentException, IOException { 645 throw 646 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 647 } 648 649 } // UncompressedBytes 650 651 private static class CompressedBytes implements ValueBytes { 652 private int dataSize; 653 private byte[] data; 654 DataInputBuffer rawData = null; 655 CompressionCodec codec = null; 656 CompressionInputStream decompressedStream = null; 657 658 private CompressedBytes(CompressionCodec codec) { 659 data = null; 660 dataSize = 0; 661 this.codec = codec; 662 } 663 664 private void reset(DataInputStream in, int length) throws IOException { 665 if (data == null) { 666 data = new byte[length]; 667 } else if (length > data.length) { 668 data = new byte[Math.max(length, data.length * 2)]; 669 } 670 dataSize = -1; 671 in.readFully(data, 0, length); 672 dataSize = length; 673 } 674 675 @Override 676 public int getSize() { 677 return dataSize; 678 } 679 680 @Override 681 public void writeUncompressedBytes(DataOutputStream outStream) 682 throws IOException { 683 if (decompressedStream == null) { 684 rawData = new DataInputBuffer(); 685 decompressedStream = codec.createInputStream(rawData); 686 } else { 687 decompressedStream.resetState(); 688 } 689 rawData.reset(data, 0, dataSize); 690 691 byte[] buffer = new byte[8192]; 692 int bytesRead = 0; 693 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 694 outStream.write(buffer, 0, bytesRead); 695 } 696 } 697 698 @Override 699 public void writeCompressedBytes(DataOutputStream outStream) 700 throws IllegalArgumentException, IOException { 701 outStream.write(data, 0, dataSize); 702 } 703 704 } // CompressedBytes 705 706 /** 707 * The class encapsulating with the metadata of a file. 708 * The metadata of a file is a list of attribute name/value 709 * pairs of Text type. 710 * 711 */ 712 public static class Metadata implements Writable { 713 714 private TreeMap<Text, Text> theMetadata; 715 716 public Metadata() { 717 this(new TreeMap<Text, Text>()); 718 } 719 720 public Metadata(TreeMap<Text, Text> arg) { 721 if (arg == null) { 722 this.theMetadata = new TreeMap<Text, Text>(); 723 } else { 724 this.theMetadata = arg; 725 } 726 } 727 728 public Text get(Text name) { 729 return this.theMetadata.get(name); 730 } 731 732 public void set(Text name, Text value) { 733 this.theMetadata.put(name, value); 734 } 735 736 public TreeMap<Text, Text> getMetadata() { 737 return new TreeMap<Text, Text>(this.theMetadata); 738 } 739 740 @Override 741 public void write(DataOutput out) throws IOException { 742 out.writeInt(this.theMetadata.size()); 743 Iterator<Map.Entry<Text, Text>> iter = 744 this.theMetadata.entrySet().iterator(); 745 while (iter.hasNext()) { 746 Map.Entry<Text, Text> en = iter.next(); 747 en.getKey().write(out); 748 en.getValue().write(out); 749 } 750 } 751 752 @Override 753 public void readFields(DataInput in) throws IOException { 754 int sz = in.readInt(); 755 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 756 this.theMetadata = new TreeMap<Text, Text>(); 757 for (int i = 0; i < sz; i++) { 758 Text key = new Text(); 759 Text val = new Text(); 760 key.readFields(in); 761 val.readFields(in); 762 this.theMetadata.put(key, val); 763 } 764 } 765 766 @Override 767 public boolean equals(Object other) { 768 if (other == null) { 769 return false; 770 } 771 if (other.getClass() != this.getClass()) { 772 return false; 773 } else { 774 return equals((Metadata)other); 775 } 776 } 777 778 public boolean equals(Metadata other) { 779 if (other == null) return false; 780 if (this.theMetadata.size() != other.theMetadata.size()) { 781 return false; 782 } 783 Iterator<Map.Entry<Text, Text>> iter1 = 784 this.theMetadata.entrySet().iterator(); 785 Iterator<Map.Entry<Text, Text>> iter2 = 786 other.theMetadata.entrySet().iterator(); 787 while (iter1.hasNext() && iter2.hasNext()) { 788 Map.Entry<Text, Text> en1 = iter1.next(); 789 Map.Entry<Text, Text> en2 = iter2.next(); 790 if (!en1.getKey().equals(en2.getKey())) { 791 return false; 792 } 793 if (!en1.getValue().equals(en2.getValue())) { 794 return false; 795 } 796 } 797 if (iter1.hasNext() || iter2.hasNext()) { 798 return false; 799 } 800 return true; 801 } 802 803 @Override 804 public int hashCode() { 805 assert false : "hashCode not designed"; 806 return 42; // any arbitrary constant will do 807 } 808 809 @Override 810 public String toString() { 811 StringBuilder sb = new StringBuilder(); 812 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 813 Iterator<Map.Entry<Text, Text>> iter = 814 this.theMetadata.entrySet().iterator(); 815 while (iter.hasNext()) { 816 Map.Entry<Text, Text> en = iter.next(); 817 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 818 sb.append("\n"); 819 } 820 return sb.toString(); 821 } 822 } 823 824 /** Write key/value pairs to a sequence-format file. */ 825 public static class Writer implements java.io.Closeable, Syncable { 826 private Configuration conf; 827 FSDataOutputStream out; 828 boolean ownOutputStream = true; 829 DataOutputBuffer buffer = new DataOutputBuffer(); 830 831 Class keyClass; 832 Class valClass; 833 834 private final CompressionType compress; 835 CompressionCodec codec = null; 836 CompressionOutputStream deflateFilter = null; 837 DataOutputStream deflateOut = null; 838 Metadata metadata = null; 839 Compressor compressor = null; 840 841 protected Serializer keySerializer; 842 protected Serializer uncompressedValSerializer; 843 protected Serializer compressedValSerializer; 844 845 // Insert a globally unique 16-byte value every few entries, so that one 846 // can seek into the middle of a file and then synchronize with record 847 // starts and ends by scanning for this value. 848 long lastSyncPos; // position of last sync 849 byte[] sync; // 16 random bytes 850 { 851 try { 852 MessageDigest digester = MessageDigest.getInstance("MD5"); 853 long time = Time.now(); 854 digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8)); 855 sync = digester.digest(); 856 } catch (Exception e) { 857 throw new RuntimeException(e); 858 } 859 } 860 861 public static interface Option {} 862 863 static class FileOption extends Options.PathOption 864 implements Option { 865 FileOption(Path path) { 866 super(path); 867 } 868 } 869 870 /** 871 * @deprecated only used for backwards-compatibility in the createWriter methods 872 * that take FileSystem. 873 */ 874 @Deprecated 875 private static class FileSystemOption implements Option { 876 private final FileSystem value; 877 protected FileSystemOption(FileSystem value) { 878 this.value = value; 879 } 880 public FileSystem getValue() { 881 return value; 882 } 883 } 884 885 static class StreamOption extends Options.FSDataOutputStreamOption 886 implements Option { 887 StreamOption(FSDataOutputStream stream) { 888 super(stream); 889 } 890 } 891 892 static class BufferSizeOption extends Options.IntegerOption 893 implements Option { 894 BufferSizeOption(int value) { 895 super(value); 896 } 897 } 898 899 static class BlockSizeOption extends Options.LongOption implements Option { 900 BlockSizeOption(long value) { 901 super(value); 902 } 903 } 904 905 static class ReplicationOption extends Options.IntegerOption 906 implements Option { 907 ReplicationOption(int value) { 908 super(value); 909 } 910 } 911 912 static class KeyClassOption extends Options.ClassOption implements Option { 913 KeyClassOption(Class<?> value) { 914 super(value); 915 } 916 } 917 918 static class ValueClassOption extends Options.ClassOption 919 implements Option { 920 ValueClassOption(Class<?> value) { 921 super(value); 922 } 923 } 924 925 static class MetadataOption implements Option { 926 private final Metadata value; 927 MetadataOption(Metadata value) { 928 this.value = value; 929 } 930 Metadata getValue() { 931 return value; 932 } 933 } 934 935 static class ProgressableOption extends Options.ProgressableOption 936 implements Option { 937 ProgressableOption(Progressable value) { 938 super(value); 939 } 940 } 941 942 private static class CompressionOption implements Option { 943 private final CompressionType value; 944 private final CompressionCodec codec; 945 CompressionOption(CompressionType value) { 946 this(value, null); 947 } 948 CompressionOption(CompressionType value, CompressionCodec codec) { 949 this.value = value; 950 this.codec = (CompressionType.NONE != value && null == codec) 951 ? new DefaultCodec() 952 : codec; 953 } 954 CompressionType getValue() { 955 return value; 956 } 957 CompressionCodec getCodec() { 958 return codec; 959 } 960 } 961 962 public static Option file(Path value) { 963 return new FileOption(value); 964 } 965 966 /** 967 * @deprecated only used for backwards-compatibility in the createWriter methods 968 * that take FileSystem. 969 */ 970 @Deprecated 971 private static Option filesystem(FileSystem fs) { 972 return new SequenceFile.Writer.FileSystemOption(fs); 973 } 974 975 public static Option bufferSize(int value) { 976 return new BufferSizeOption(value); 977 } 978 979 public static Option stream(FSDataOutputStream value) { 980 return new StreamOption(value); 981 } 982 983 public static Option replication(short value) { 984 return new ReplicationOption(value); 985 } 986 987 public static Option blockSize(long value) { 988 return new BlockSizeOption(value); 989 } 990 991 public static Option progressable(Progressable value) { 992 return new ProgressableOption(value); 993 } 994 995 public static Option keyClass(Class<?> value) { 996 return new KeyClassOption(value); 997 } 998 999 public static Option valueClass(Class<?> value) { 1000 return new ValueClassOption(value); 1001 } 1002 1003 public static Option metadata(Metadata value) { 1004 return new MetadataOption(value); 1005 } 1006 1007 public static Option compression(CompressionType value) { 1008 return new CompressionOption(value); 1009 } 1010 1011 public static Option compression(CompressionType value, 1012 CompressionCodec codec) { 1013 return new CompressionOption(value, codec); 1014 } 1015 1016 /** 1017 * Construct a uncompressed writer from a set of options. 1018 * @param conf the configuration to use 1019 * @param options the options used when creating the writer 1020 * @throws IOException if it fails 1021 */ 1022 Writer(Configuration conf, 1023 Option... opts) throws IOException { 1024 BlockSizeOption blockSizeOption = 1025 Options.getOption(BlockSizeOption.class, opts); 1026 BufferSizeOption bufferSizeOption = 1027 Options.getOption(BufferSizeOption.class, opts); 1028 ReplicationOption replicationOption = 1029 Options.getOption(ReplicationOption.class, opts); 1030 ProgressableOption progressOption = 1031 Options.getOption(ProgressableOption.class, opts); 1032 FileOption fileOption = Options.getOption(FileOption.class, opts); 1033 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1034 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1035 KeyClassOption keyClassOption = 1036 Options.getOption(KeyClassOption.class, opts); 1037 ValueClassOption valueClassOption = 1038 Options.getOption(ValueClassOption.class, opts); 1039 MetadataOption metadataOption = 1040 Options.getOption(MetadataOption.class, opts); 1041 CompressionOption compressionTypeOption = 1042 Options.getOption(CompressionOption.class, opts); 1043 // check consistency of options 1044 if ((fileOption == null) == (streamOption == null)) { 1045 throw new IllegalArgumentException("file or stream must be specified"); 1046 } 1047 if (fileOption == null && (blockSizeOption != null || 1048 bufferSizeOption != null || 1049 replicationOption != null || 1050 progressOption != null)) { 1051 throw new IllegalArgumentException("file modifier options not " + 1052 "compatible with stream"); 1053 } 1054 1055 FSDataOutputStream out; 1056 boolean ownStream = fileOption != null; 1057 if (ownStream) { 1058 Path p = fileOption.getValue(); 1059 FileSystem fs; 1060 if (fsOption != null) { 1061 fs = fsOption.getValue(); 1062 } else { 1063 fs = p.getFileSystem(conf); 1064 } 1065 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1066 bufferSizeOption.getValue(); 1067 short replication = replicationOption == null ? 1068 fs.getDefaultReplication(p) : 1069 (short) replicationOption.getValue(); 1070 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1071 blockSizeOption.getValue(); 1072 Progressable progress = progressOption == null ? null : 1073 progressOption.getValue(); 1074 out = fs.create(p, true, bufferSize, replication, blockSize, progress); 1075 } else { 1076 out = streamOption.getValue(); 1077 } 1078 Class<?> keyClass = keyClassOption == null ? 1079 Object.class : keyClassOption.getValue(); 1080 Class<?> valueClass = valueClassOption == null ? 1081 Object.class : valueClassOption.getValue(); 1082 Metadata metadata = metadataOption == null ? 1083 new Metadata() : metadataOption.getValue(); 1084 this.compress = compressionTypeOption.getValue(); 1085 final CompressionCodec codec = compressionTypeOption.getCodec(); 1086 if (codec != null && 1087 (codec instanceof GzipCodec) && 1088 !NativeCodeLoader.isNativeCodeLoaded() && 1089 !ZlibFactory.isNativeZlibLoaded(conf)) { 1090 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1091 "GzipCodec without native-hadoop " + 1092 "code!"); 1093 } 1094 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1095 } 1096 1097 /** Create the named file. 1098 * @deprecated Use 1099 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1100 * instead. 1101 */ 1102 @Deprecated 1103 public Writer(FileSystem fs, Configuration conf, Path name, 1104 Class keyClass, Class valClass) throws IOException { 1105 this.compress = CompressionType.NONE; 1106 init(conf, fs.create(name), true, keyClass, valClass, null, 1107 new Metadata()); 1108 } 1109 1110 /** Create the named file with write-progress reporter. 1111 * @deprecated Use 1112 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1113 * instead. 1114 */ 1115 @Deprecated 1116 public Writer(FileSystem fs, Configuration conf, Path name, 1117 Class keyClass, Class valClass, 1118 Progressable progress, Metadata metadata) throws IOException { 1119 this.compress = CompressionType.NONE; 1120 init(conf, fs.create(name, progress), true, keyClass, valClass, 1121 null, metadata); 1122 } 1123 1124 /** Create the named file with write-progress reporter. 1125 * @deprecated Use 1126 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1127 * instead. 1128 */ 1129 @Deprecated 1130 public Writer(FileSystem fs, Configuration conf, Path name, 1131 Class keyClass, Class valClass, 1132 int bufferSize, short replication, long blockSize, 1133 Progressable progress, Metadata metadata) throws IOException { 1134 this.compress = CompressionType.NONE; 1135 init(conf, 1136 fs.create(name, true, bufferSize, replication, blockSize, progress), 1137 true, keyClass, valClass, null, metadata); 1138 } 1139 1140 boolean isCompressed() { return compress != CompressionType.NONE; } 1141 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1142 1143 Writer ownStream() { this.ownOutputStream = true; return this; } 1144 1145 /** Write and flush the file header. */ 1146 private void writeFileHeader() 1147 throws IOException { 1148 out.write(VERSION); 1149 Text.writeString(out, keyClass.getName()); 1150 Text.writeString(out, valClass.getName()); 1151 1152 out.writeBoolean(this.isCompressed()); 1153 out.writeBoolean(this.isBlockCompressed()); 1154 1155 if (this.isCompressed()) { 1156 Text.writeString(out, (codec.getClass()).getName()); 1157 } 1158 this.metadata.write(out); 1159 out.write(sync); // write the sync bytes 1160 out.flush(); // flush header 1161 } 1162 1163 /** Initialize. */ 1164 @SuppressWarnings("unchecked") 1165 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1166 Class keyClass, Class valClass, 1167 CompressionCodec codec, Metadata metadata) 1168 throws IOException { 1169 this.conf = conf; 1170 this.out = out; 1171 this.ownOutputStream = ownStream; 1172 this.keyClass = keyClass; 1173 this.valClass = valClass; 1174 this.codec = codec; 1175 this.metadata = metadata; 1176 SerializationFactory serializationFactory = new SerializationFactory(conf); 1177 this.keySerializer = serializationFactory.getSerializer(keyClass); 1178 if (this.keySerializer == null) { 1179 throw new IOException( 1180 "Could not find a serializer for the Key class: '" 1181 + keyClass.getCanonicalName() + "'. " 1182 + "Please ensure that the configuration '" + 1183 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1184 + "properly configured, if you're using" 1185 + "custom serialization."); 1186 } 1187 this.keySerializer.open(buffer); 1188 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1189 if (this.uncompressedValSerializer == null) { 1190 throw new IOException( 1191 "Could not find a serializer for the Value class: '" 1192 + valClass.getCanonicalName() + "'. " 1193 + "Please ensure that the configuration '" + 1194 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1195 + "properly configured, if you're using" 1196 + "custom serialization."); 1197 } 1198 this.uncompressedValSerializer.open(buffer); 1199 if (this.codec != null) { 1200 ReflectionUtils.setConf(this.codec, this.conf); 1201 this.compressor = CodecPool.getCompressor(this.codec); 1202 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1203 this.deflateOut = 1204 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1205 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1206 if (this.compressedValSerializer == null) { 1207 throw new IOException( 1208 "Could not find a serializer for the Value class: '" 1209 + valClass.getCanonicalName() + "'. " 1210 + "Please ensure that the configuration '" + 1211 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1212 + "properly configured, if you're using" 1213 + "custom serialization."); 1214 } 1215 this.compressedValSerializer.open(deflateOut); 1216 } 1217 writeFileHeader(); 1218 } 1219 1220 /** Returns the class of keys in this file. */ 1221 public Class getKeyClass() { return keyClass; } 1222 1223 /** Returns the class of values in this file. */ 1224 public Class getValueClass() { return valClass; } 1225 1226 /** Returns the compression codec of data in this file. */ 1227 public CompressionCodec getCompressionCodec() { return codec; } 1228 1229 /** create a sync point */ 1230 public void sync() throws IOException { 1231 if (sync != null && lastSyncPos != out.getPos()) { 1232 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1233 out.write(sync); // write sync 1234 lastSyncPos = out.getPos(); // update lastSyncPos 1235 } 1236 } 1237 1238 /** 1239 * flush all currently written data to the file system 1240 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1241 */ 1242 @Deprecated 1243 public void syncFs() throws IOException { 1244 if (out != null) { 1245 out.sync(); // flush contents to file system 1246 } 1247 } 1248 1249 @Override 1250 public void hsync() throws IOException { 1251 if (out != null) { 1252 out.hsync(); 1253 } 1254 } 1255 1256 @Override 1257 public void hflush() throws IOException { 1258 if (out != null) { 1259 out.hflush(); 1260 } 1261 } 1262 1263 /** Returns the configuration of this file. */ 1264 Configuration getConf() { return conf; } 1265 1266 /** Close the file. */ 1267 @Override 1268 public synchronized void close() throws IOException { 1269 keySerializer.close(); 1270 uncompressedValSerializer.close(); 1271 if (compressedValSerializer != null) { 1272 compressedValSerializer.close(); 1273 } 1274 1275 CodecPool.returnCompressor(compressor); 1276 compressor = null; 1277 1278 if (out != null) { 1279 1280 // Close the underlying stream iff we own it... 1281 if (ownOutputStream) { 1282 out.close(); 1283 } else { 1284 out.flush(); 1285 } 1286 out = null; 1287 } 1288 } 1289 1290 synchronized void checkAndWriteSync() throws IOException { 1291 if (sync != null && 1292 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1293 sync(); 1294 } 1295 } 1296 1297 /** Append a key/value pair. */ 1298 public void append(Writable key, Writable val) 1299 throws IOException { 1300 append((Object) key, (Object) val); 1301 } 1302 1303 /** Append a key/value pair. */ 1304 @SuppressWarnings("unchecked") 1305 public synchronized void append(Object key, Object val) 1306 throws IOException { 1307 if (key.getClass() != keyClass) 1308 throw new IOException("wrong key class: "+key.getClass().getName() 1309 +" is not "+keyClass); 1310 if (val.getClass() != valClass) 1311 throw new IOException("wrong value class: "+val.getClass().getName() 1312 +" is not "+valClass); 1313 1314 buffer.reset(); 1315 1316 // Append the 'key' 1317 keySerializer.serialize(key); 1318 int keyLength = buffer.getLength(); 1319 if (keyLength < 0) 1320 throw new IOException("negative length keys not allowed: " + key); 1321 1322 // Append the 'value' 1323 if (compress == CompressionType.RECORD) { 1324 deflateFilter.resetState(); 1325 compressedValSerializer.serialize(val); 1326 deflateOut.flush(); 1327 deflateFilter.finish(); 1328 } else { 1329 uncompressedValSerializer.serialize(val); 1330 } 1331 1332 // Write the record out 1333 checkAndWriteSync(); // sync 1334 out.writeInt(buffer.getLength()); // total record length 1335 out.writeInt(keyLength); // key portion length 1336 out.write(buffer.getData(), 0, buffer.getLength()); // data 1337 } 1338 1339 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1340 int keyLength, ValueBytes val) throws IOException { 1341 if (keyLength < 0) 1342 throw new IOException("negative length keys not allowed: " + keyLength); 1343 1344 int valLength = val.getSize(); 1345 1346 checkAndWriteSync(); 1347 1348 out.writeInt(keyLength+valLength); // total record length 1349 out.writeInt(keyLength); // key portion length 1350 out.write(keyData, keyOffset, keyLength); // key 1351 val.writeUncompressedBytes(out); // value 1352 } 1353 1354 /** Returns the current length of the output file. 1355 * 1356 * <p>This always returns a synchronized position. In other words, 1357 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1358 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1359 * the key may be earlier in the file than key last written when this 1360 * method was called (e.g., with block-compression, it may be the first key 1361 * in the block that was being written when this method was called). 1362 */ 1363 public synchronized long getLength() throws IOException { 1364 return out.getPos(); 1365 } 1366 1367 } // class Writer 1368 1369 /** Write key/compressed-value pairs to a sequence-format file. */ 1370 static class RecordCompressWriter extends Writer { 1371 1372 RecordCompressWriter(Configuration conf, 1373 Option... options) throws IOException { 1374 super(conf, options); 1375 } 1376 1377 /** Append a key/value pair. */ 1378 @Override 1379 @SuppressWarnings("unchecked") 1380 public synchronized void append(Object key, Object val) 1381 throws IOException { 1382 if (key.getClass() != keyClass) 1383 throw new IOException("wrong key class: "+key.getClass().getName() 1384 +" is not "+keyClass); 1385 if (val.getClass() != valClass) 1386 throw new IOException("wrong value class: "+val.getClass().getName() 1387 +" is not "+valClass); 1388 1389 buffer.reset(); 1390 1391 // Append the 'key' 1392 keySerializer.serialize(key); 1393 int keyLength = buffer.getLength(); 1394 if (keyLength < 0) 1395 throw new IOException("negative length keys not allowed: " + key); 1396 1397 // Compress 'value' and append it 1398 deflateFilter.resetState(); 1399 compressedValSerializer.serialize(val); 1400 deflateOut.flush(); 1401 deflateFilter.finish(); 1402 1403 // Write the record out 1404 checkAndWriteSync(); // sync 1405 out.writeInt(buffer.getLength()); // total record length 1406 out.writeInt(keyLength); // key portion length 1407 out.write(buffer.getData(), 0, buffer.getLength()); // data 1408 } 1409 1410 /** Append a key/value pair. */ 1411 @Override 1412 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1413 int keyLength, ValueBytes val) throws IOException { 1414 1415 if (keyLength < 0) 1416 throw new IOException("negative length keys not allowed: " + keyLength); 1417 1418 int valLength = val.getSize(); 1419 1420 checkAndWriteSync(); // sync 1421 out.writeInt(keyLength+valLength); // total record length 1422 out.writeInt(keyLength); // key portion length 1423 out.write(keyData, keyOffset, keyLength); // 'key' data 1424 val.writeCompressedBytes(out); // 'value' data 1425 } 1426 1427 } // RecordCompressionWriter 1428 1429 /** Write compressed key/value blocks to a sequence-format file. */ 1430 static class BlockCompressWriter extends Writer { 1431 1432 private int noBufferedRecords = 0; 1433 1434 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1435 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1436 1437 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1438 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1439 1440 private final int compressionBlockSize; 1441 1442 BlockCompressWriter(Configuration conf, 1443 Option... options) throws IOException { 1444 super(conf, options); 1445 compressionBlockSize = 1446 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1447 keySerializer.close(); 1448 keySerializer.open(keyBuffer); 1449 uncompressedValSerializer.close(); 1450 uncompressedValSerializer.open(valBuffer); 1451 } 1452 1453 /** Workhorse to check and write out compressed data/lengths */ 1454 private synchronized 1455 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1456 throws IOException { 1457 deflateFilter.resetState(); 1458 buffer.reset(); 1459 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1460 uncompressedDataBuffer.getLength()); 1461 deflateOut.flush(); 1462 deflateFilter.finish(); 1463 1464 WritableUtils.writeVInt(out, buffer.getLength()); 1465 out.write(buffer.getData(), 0, buffer.getLength()); 1466 } 1467 1468 /** Compress and flush contents to dfs */ 1469 @Override 1470 public synchronized void sync() throws IOException { 1471 if (noBufferedRecords > 0) { 1472 super.sync(); 1473 1474 // No. of records 1475 WritableUtils.writeVInt(out, noBufferedRecords); 1476 1477 // Write 'keys' and lengths 1478 writeBuffer(keyLenBuffer); 1479 writeBuffer(keyBuffer); 1480 1481 // Write 'values' and lengths 1482 writeBuffer(valLenBuffer); 1483 writeBuffer(valBuffer); 1484 1485 // Flush the file-stream 1486 out.flush(); 1487 1488 // Reset internal states 1489 keyLenBuffer.reset(); 1490 keyBuffer.reset(); 1491 valLenBuffer.reset(); 1492 valBuffer.reset(); 1493 noBufferedRecords = 0; 1494 } 1495 1496 } 1497 1498 /** Close the file. */ 1499 @Override 1500 public synchronized void close() throws IOException { 1501 if (out != null) { 1502 sync(); 1503 } 1504 super.close(); 1505 } 1506 1507 /** Append a key/value pair. */ 1508 @Override 1509 @SuppressWarnings("unchecked") 1510 public synchronized void append(Object key, Object val) 1511 throws IOException { 1512 if (key.getClass() != keyClass) 1513 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1514 if (val.getClass() != valClass) 1515 throw new IOException("wrong value class: "+val+" is not "+valClass); 1516 1517 // Save key/value into respective buffers 1518 int oldKeyLength = keyBuffer.getLength(); 1519 keySerializer.serialize(key); 1520 int keyLength = keyBuffer.getLength() - oldKeyLength; 1521 if (keyLength < 0) 1522 throw new IOException("negative length keys not allowed: " + key); 1523 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1524 1525 int oldValLength = valBuffer.getLength(); 1526 uncompressedValSerializer.serialize(val); 1527 int valLength = valBuffer.getLength() - oldValLength; 1528 WritableUtils.writeVInt(valLenBuffer, valLength); 1529 1530 // Added another key/value pair 1531 ++noBufferedRecords; 1532 1533 // Compress and flush? 1534 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1535 if (currentBlockSize >= compressionBlockSize) { 1536 sync(); 1537 } 1538 } 1539 1540 /** Append a key/value pair. */ 1541 @Override 1542 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1543 int keyLength, ValueBytes val) throws IOException { 1544 1545 if (keyLength < 0) 1546 throw new IOException("negative length keys not allowed"); 1547 1548 int valLength = val.getSize(); 1549 1550 // Save key/value data in relevant buffers 1551 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1552 keyBuffer.write(keyData, keyOffset, keyLength); 1553 WritableUtils.writeVInt(valLenBuffer, valLength); 1554 val.writeUncompressedBytes(valBuffer); 1555 1556 // Added another key/value pair 1557 ++noBufferedRecords; 1558 1559 // Compress and flush? 1560 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1561 if (currentBlockSize >= compressionBlockSize) { 1562 sync(); 1563 } 1564 } 1565 1566 } // BlockCompressionWriter 1567 1568 /** Get the configured buffer size */ 1569 private static int getBufferSize(Configuration conf) { 1570 return conf.getInt("io.file.buffer.size", 4096); 1571 } 1572 1573 /** Reads key/value pairs from a sequence-format file. */ 1574 public static class Reader implements java.io.Closeable { 1575 private String filename; 1576 private FSDataInputStream in; 1577 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1578 1579 private byte version; 1580 1581 private String keyClassName; 1582 private String valClassName; 1583 private Class keyClass; 1584 private Class valClass; 1585 1586 private CompressionCodec codec = null; 1587 private Metadata metadata = null; 1588 1589 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1590 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1591 private boolean syncSeen; 1592 1593 private long headerEnd; 1594 private long end; 1595 private int keyLength; 1596 private int recordLength; 1597 1598 private boolean decompress; 1599 private boolean blockCompressed; 1600 1601 private Configuration conf; 1602 1603 private int noBufferedRecords = 0; 1604 private boolean lazyDecompress = true; 1605 private boolean valuesDecompressed = true; 1606 1607 private int noBufferedKeys = 0; 1608 private int noBufferedValues = 0; 1609 1610 private DataInputBuffer keyLenBuffer = null; 1611 private CompressionInputStream keyLenInFilter = null; 1612 private DataInputStream keyLenIn = null; 1613 private Decompressor keyLenDecompressor = null; 1614 private DataInputBuffer keyBuffer = null; 1615 private CompressionInputStream keyInFilter = null; 1616 private DataInputStream keyIn = null; 1617 private Decompressor keyDecompressor = null; 1618 1619 private DataInputBuffer valLenBuffer = null; 1620 private CompressionInputStream valLenInFilter = null; 1621 private DataInputStream valLenIn = null; 1622 private Decompressor valLenDecompressor = null; 1623 private DataInputBuffer valBuffer = null; 1624 private CompressionInputStream valInFilter = null; 1625 private DataInputStream valIn = null; 1626 private Decompressor valDecompressor = null; 1627 1628 private Deserializer keyDeserializer; 1629 private Deserializer valDeserializer; 1630 1631 /** 1632 * A tag interface for all of the Reader options 1633 */ 1634 public static interface Option {} 1635 1636 /** 1637 * Create an option to specify the path name of the sequence file. 1638 * @param value the path to read 1639 * @return a new option 1640 */ 1641 public static Option file(Path value) { 1642 return new FileOption(value); 1643 } 1644 1645 /** 1646 * Create an option to specify the stream with the sequence file. 1647 * @param value the stream to read. 1648 * @return a new option 1649 */ 1650 public static Option stream(FSDataInputStream value) { 1651 return new InputStreamOption(value); 1652 } 1653 1654 /** 1655 * Create an option to specify the starting byte to read. 1656 * @param value the number of bytes to skip over 1657 * @return a new option 1658 */ 1659 public static Option start(long value) { 1660 return new StartOption(value); 1661 } 1662 1663 /** 1664 * Create an option to specify the number of bytes to read. 1665 * @param value the number of bytes to read 1666 * @return a new option 1667 */ 1668 public static Option length(long value) { 1669 return new LengthOption(value); 1670 } 1671 1672 /** 1673 * Create an option with the buffer size for reading the given pathname. 1674 * @param value the number of bytes to buffer 1675 * @return a new option 1676 */ 1677 public static Option bufferSize(int value) { 1678 return new BufferSizeOption(value); 1679 } 1680 1681 private static class FileOption extends Options.PathOption 1682 implements Option { 1683 private FileOption(Path value) { 1684 super(value); 1685 } 1686 } 1687 1688 private static class InputStreamOption 1689 extends Options.FSDataInputStreamOption 1690 implements Option { 1691 private InputStreamOption(FSDataInputStream value) { 1692 super(value); 1693 } 1694 } 1695 1696 private static class StartOption extends Options.LongOption 1697 implements Option { 1698 private StartOption(long value) { 1699 super(value); 1700 } 1701 } 1702 1703 private static class LengthOption extends Options.LongOption 1704 implements Option { 1705 private LengthOption(long value) { 1706 super(value); 1707 } 1708 } 1709 1710 private static class BufferSizeOption extends Options.IntegerOption 1711 implements Option { 1712 private BufferSizeOption(int value) { 1713 super(value); 1714 } 1715 } 1716 1717 // only used directly 1718 private static class OnlyHeaderOption extends Options.BooleanOption 1719 implements Option { 1720 private OnlyHeaderOption() { 1721 super(true); 1722 } 1723 } 1724 1725 public Reader(Configuration conf, Option... opts) throws IOException { 1726 // Look up the options, these are null if not set 1727 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1728 InputStreamOption streamOpt = 1729 Options.getOption(InputStreamOption.class, opts); 1730 StartOption startOpt = Options.getOption(StartOption.class, opts); 1731 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1732 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1733 OnlyHeaderOption headerOnly = 1734 Options.getOption(OnlyHeaderOption.class, opts); 1735 // check for consistency 1736 if ((fileOpt == null) == (streamOpt == null)) { 1737 throw new 1738 IllegalArgumentException("File or stream option must be specified"); 1739 } 1740 if (fileOpt == null && bufOpt != null) { 1741 throw new IllegalArgumentException("buffer size can only be set when" + 1742 " a file is specified."); 1743 } 1744 // figure out the real values 1745 Path filename = null; 1746 FSDataInputStream file; 1747 final long len; 1748 if (fileOpt != null) { 1749 filename = fileOpt.getValue(); 1750 FileSystem fs = filename.getFileSystem(conf); 1751 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1752 len = null == lenOpt 1753 ? fs.getFileStatus(filename).getLen() 1754 : lenOpt.getValue(); 1755 file = openFile(fs, filename, bufSize, len); 1756 } else { 1757 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1758 file = streamOpt.getValue(); 1759 } 1760 long start = startOpt == null ? 0 : startOpt.getValue(); 1761 // really set up 1762 initialize(filename, file, start, len, conf, headerOnly != null); 1763 } 1764 1765 /** 1766 * Construct a reader by opening a file from the given file system. 1767 * @param fs The file system used to open the file. 1768 * @param file The file being read. 1769 * @param conf Configuration 1770 * @throws IOException 1771 * @deprecated Use Reader(Configuration, Option...) instead. 1772 */ 1773 @Deprecated 1774 public Reader(FileSystem fs, Path file, 1775 Configuration conf) throws IOException { 1776 this(conf, file(file.makeQualified(fs))); 1777 } 1778 1779 /** 1780 * Construct a reader by the given input stream. 1781 * @param in An input stream. 1782 * @param buffersize unused 1783 * @param start The starting position. 1784 * @param length The length being read. 1785 * @param conf Configuration 1786 * @throws IOException 1787 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1788 */ 1789 @Deprecated 1790 public Reader(FSDataInputStream in, int buffersize, 1791 long start, long length, Configuration conf) throws IOException { 1792 this(conf, stream(in), start(start), length(length)); 1793 } 1794 1795 /** Common work of the constructors. */ 1796 private void initialize(Path filename, FSDataInputStream in, 1797 long start, long length, Configuration conf, 1798 boolean tempReader) throws IOException { 1799 if (in == null) { 1800 throw new IllegalArgumentException("in == null"); 1801 } 1802 this.filename = filename == null ? "<unknown>" : filename.toString(); 1803 this.in = in; 1804 this.conf = conf; 1805 boolean succeeded = false; 1806 try { 1807 seek(start); 1808 this.end = this.in.getPos() + length; 1809 // if it wrapped around, use the max 1810 if (end < length) { 1811 end = Long.MAX_VALUE; 1812 } 1813 init(tempReader); 1814 succeeded = true; 1815 } finally { 1816 if (!succeeded) { 1817 IOUtils.cleanup(LOG, this.in); 1818 } 1819 } 1820 } 1821 1822 /** 1823 * Override this method to specialize the type of 1824 * {@link FSDataInputStream} returned. 1825 * @param fs The file system used to open the file. 1826 * @param file The file being read. 1827 * @param bufferSize The buffer size used to read the file. 1828 * @param length The length being read if it is >= 0. Otherwise, 1829 * the length is not available. 1830 * @return The opened stream. 1831 * @throws IOException 1832 */ 1833 protected FSDataInputStream openFile(FileSystem fs, Path file, 1834 int bufferSize, long length) throws IOException { 1835 return fs.open(file, bufferSize); 1836 } 1837 1838 /** 1839 * Initialize the {@link Reader} 1840 * @param tmpReader <code>true</code> if we are constructing a temporary 1841 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1842 * and hence do not initialize every component; 1843 * <code>false</code> otherwise. 1844 * @throws IOException 1845 */ 1846 private void init(boolean tempReader) throws IOException { 1847 byte[] versionBlock = new byte[VERSION.length]; 1848 in.readFully(versionBlock); 1849 1850 if ((versionBlock[0] != VERSION[0]) || 1851 (versionBlock[1] != VERSION[1]) || 1852 (versionBlock[2] != VERSION[2])) 1853 throw new IOException(this + " not a SequenceFile"); 1854 1855 // Set 'version' 1856 version = versionBlock[3]; 1857 if (version > VERSION[3]) 1858 throw new VersionMismatchException(VERSION[3], version); 1859 1860 if (version < BLOCK_COMPRESS_VERSION) { 1861 UTF8 className = new UTF8(); 1862 1863 className.readFields(in); 1864 keyClassName = className.toStringChecked(); // key class name 1865 1866 className.readFields(in); 1867 valClassName = className.toStringChecked(); // val class name 1868 } else { 1869 keyClassName = Text.readString(in); 1870 valClassName = Text.readString(in); 1871 } 1872 1873 if (version > 2) { // if version > 2 1874 this.decompress = in.readBoolean(); // is compressed? 1875 } else { 1876 decompress = false; 1877 } 1878 1879 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1880 this.blockCompressed = in.readBoolean(); // is block-compressed? 1881 } else { 1882 blockCompressed = false; 1883 } 1884 1885 // if version >= 5 1886 // setup the compression codec 1887 if (decompress) { 1888 if (version >= CUSTOM_COMPRESS_VERSION) { 1889 String codecClassname = Text.readString(in); 1890 try { 1891 Class<? extends CompressionCodec> codecClass 1892 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1893 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1894 } catch (ClassNotFoundException cnfe) { 1895 throw new IllegalArgumentException("Unknown codec: " + 1896 codecClassname, cnfe); 1897 } 1898 } else { 1899 codec = new DefaultCodec(); 1900 ((Configurable)codec).setConf(conf); 1901 } 1902 } 1903 1904 this.metadata = new Metadata(); 1905 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1906 this.metadata.readFields(in); 1907 } 1908 1909 if (version > 1) { // if version > 1 1910 in.readFully(sync); // read sync bytes 1911 headerEnd = in.getPos(); // record end of header 1912 } 1913 1914 // Initialize... *not* if this we are constructing a temporary Reader 1915 if (!tempReader) { 1916 valBuffer = new DataInputBuffer(); 1917 if (decompress) { 1918 valDecompressor = CodecPool.getDecompressor(codec); 1919 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1920 valIn = new DataInputStream(valInFilter); 1921 } else { 1922 valIn = valBuffer; 1923 } 1924 1925 if (blockCompressed) { 1926 keyLenBuffer = new DataInputBuffer(); 1927 keyBuffer = new DataInputBuffer(); 1928 valLenBuffer = new DataInputBuffer(); 1929 1930 keyLenDecompressor = CodecPool.getDecompressor(codec); 1931 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1932 keyLenDecompressor); 1933 keyLenIn = new DataInputStream(keyLenInFilter); 1934 1935 keyDecompressor = CodecPool.getDecompressor(codec); 1936 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 1937 keyIn = new DataInputStream(keyInFilter); 1938 1939 valLenDecompressor = CodecPool.getDecompressor(codec); 1940 valLenInFilter = codec.createInputStream(valLenBuffer, 1941 valLenDecompressor); 1942 valLenIn = new DataInputStream(valLenInFilter); 1943 } 1944 1945 SerializationFactory serializationFactory = 1946 new SerializationFactory(conf); 1947 this.keyDeserializer = 1948 getDeserializer(serializationFactory, getKeyClass()); 1949 if (this.keyDeserializer == null) { 1950 throw new IOException( 1951 "Could not find a deserializer for the Key class: '" 1952 + getKeyClass().getCanonicalName() + "'. " 1953 + "Please ensure that the configuration '" + 1954 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1955 + "properly configured, if you're using " 1956 + "custom serialization."); 1957 } 1958 if (!blockCompressed) { 1959 this.keyDeserializer.open(valBuffer); 1960 } else { 1961 this.keyDeserializer.open(keyIn); 1962 } 1963 this.valDeserializer = 1964 getDeserializer(serializationFactory, getValueClass()); 1965 if (this.valDeserializer == null) { 1966 throw new IOException( 1967 "Could not find a deserializer for the Value class: '" 1968 + getValueClass().getCanonicalName() + "'. " 1969 + "Please ensure that the configuration '" + 1970 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1971 + "properly configured, if you're using " 1972 + "custom serialization."); 1973 } 1974 this.valDeserializer.open(valIn); 1975 } 1976 } 1977 1978 @SuppressWarnings("unchecked") 1979 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 1980 return sf.getDeserializer(c); 1981 } 1982 1983 /** Close the file. */ 1984 @Override 1985 public synchronized void close() throws IOException { 1986 // Return the decompressors to the pool 1987 CodecPool.returnDecompressor(keyLenDecompressor); 1988 CodecPool.returnDecompressor(keyDecompressor); 1989 CodecPool.returnDecompressor(valLenDecompressor); 1990 CodecPool.returnDecompressor(valDecompressor); 1991 keyLenDecompressor = keyDecompressor = null; 1992 valLenDecompressor = valDecompressor = null; 1993 1994 if (keyDeserializer != null) { 1995 keyDeserializer.close(); 1996 } 1997 if (valDeserializer != null) { 1998 valDeserializer.close(); 1999 } 2000 2001 // Close the input-stream 2002 in.close(); 2003 } 2004 2005 /** Returns the name of the key class. */ 2006 public String getKeyClassName() { 2007 return keyClassName; 2008 } 2009 2010 /** Returns the class of keys in this file. */ 2011 public synchronized Class<?> getKeyClass() { 2012 if (null == keyClass) { 2013 try { 2014 keyClass = WritableName.getClass(getKeyClassName(), conf); 2015 } catch (IOException e) { 2016 throw new RuntimeException(e); 2017 } 2018 } 2019 return keyClass; 2020 } 2021 2022 /** Returns the name of the value class. */ 2023 public String getValueClassName() { 2024 return valClassName; 2025 } 2026 2027 /** Returns the class of values in this file. */ 2028 public synchronized Class<?> getValueClass() { 2029 if (null == valClass) { 2030 try { 2031 valClass = WritableName.getClass(getValueClassName(), conf); 2032 } catch (IOException e) { 2033 throw new RuntimeException(e); 2034 } 2035 } 2036 return valClass; 2037 } 2038 2039 /** Returns true if values are compressed. */ 2040 public boolean isCompressed() { return decompress; } 2041 2042 /** Returns true if records are block-compressed. */ 2043 public boolean isBlockCompressed() { return blockCompressed; } 2044 2045 /** Returns the compression codec of data in this file. */ 2046 public CompressionCodec getCompressionCodec() { return codec; } 2047 2048 /** 2049 * Get the compression type for this file. 2050 * @return the compression type 2051 */ 2052 public CompressionType getCompressionType() { 2053 if (decompress) { 2054 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2055 } else { 2056 return CompressionType.NONE; 2057 } 2058 } 2059 2060 /** Returns the metadata object of the file */ 2061 public Metadata getMetadata() { 2062 return this.metadata; 2063 } 2064 2065 /** Returns the configuration used for this file. */ 2066 Configuration getConf() { return conf; } 2067 2068 /** Read a compressed buffer */ 2069 private synchronized void readBuffer(DataInputBuffer buffer, 2070 CompressionInputStream filter) throws IOException { 2071 // Read data into a temporary buffer 2072 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2073 2074 try { 2075 int dataBufferLength = WritableUtils.readVInt(in); 2076 dataBuffer.write(in, dataBufferLength); 2077 2078 // Set up 'buffer' connected to the input-stream 2079 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2080 } finally { 2081 dataBuffer.close(); 2082 } 2083 2084 // Reset the codec 2085 filter.resetState(); 2086 } 2087 2088 /** Read the next 'compressed' block */ 2089 private synchronized void readBlock() throws IOException { 2090 // Check if we need to throw away a whole block of 2091 // 'values' due to 'lazy decompression' 2092 if (lazyDecompress && !valuesDecompressed) { 2093 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2094 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2095 } 2096 2097 // Reset internal states 2098 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2099 valuesDecompressed = false; 2100 2101 //Process sync 2102 if (sync != null) { 2103 in.readInt(); 2104 in.readFully(syncCheck); // read syncCheck 2105 if (!Arrays.equals(sync, syncCheck)) // check it 2106 throw new IOException("File is corrupt!"); 2107 } 2108 syncSeen = true; 2109 2110 // Read number of records in this block 2111 noBufferedRecords = WritableUtils.readVInt(in); 2112 2113 // Read key lengths and keys 2114 readBuffer(keyLenBuffer, keyLenInFilter); 2115 readBuffer(keyBuffer, keyInFilter); 2116 noBufferedKeys = noBufferedRecords; 2117 2118 // Read value lengths and values 2119 if (!lazyDecompress) { 2120 readBuffer(valLenBuffer, valLenInFilter); 2121 readBuffer(valBuffer, valInFilter); 2122 noBufferedValues = noBufferedRecords; 2123 valuesDecompressed = true; 2124 } 2125 } 2126 2127 /** 2128 * Position valLenIn/valIn to the 'value' 2129 * corresponding to the 'current' key 2130 */ 2131 private synchronized void seekToCurrentValue() throws IOException { 2132 if (!blockCompressed) { 2133 if (decompress) { 2134 valInFilter.resetState(); 2135 } 2136 valBuffer.reset(); 2137 } else { 2138 // Check if this is the first value in the 'block' to be read 2139 if (lazyDecompress && !valuesDecompressed) { 2140 // Read the value lengths and values 2141 readBuffer(valLenBuffer, valLenInFilter); 2142 readBuffer(valBuffer, valInFilter); 2143 noBufferedValues = noBufferedRecords; 2144 valuesDecompressed = true; 2145 } 2146 2147 // Calculate the no. of bytes to skip 2148 // Note: 'current' key has already been read! 2149 int skipValBytes = 0; 2150 int currentKey = noBufferedKeys + 1; 2151 for (int i=noBufferedValues; i > currentKey; --i) { 2152 skipValBytes += WritableUtils.readVInt(valLenIn); 2153 --noBufferedValues; 2154 } 2155 2156 // Skip to the 'val' corresponding to 'current' key 2157 if (skipValBytes > 0) { 2158 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2159 throw new IOException("Failed to seek to " + currentKey + 2160 "(th) value!"); 2161 } 2162 } 2163 } 2164 } 2165 2166 /** 2167 * Get the 'value' corresponding to the last read 'key'. 2168 * @param val : The 'value' to be read. 2169 * @throws IOException 2170 */ 2171 public synchronized void getCurrentValue(Writable val) 2172 throws IOException { 2173 if (val instanceof Configurable) { 2174 ((Configurable) val).setConf(this.conf); 2175 } 2176 2177 // Position stream to 'current' value 2178 seekToCurrentValue(); 2179 2180 if (!blockCompressed) { 2181 val.readFields(valIn); 2182 2183 if (valIn.read() > 0) { 2184 LOG.info("available bytes: " + valIn.available()); 2185 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2186 + " bytes, should read " + 2187 (valBuffer.getLength()-keyLength)); 2188 } 2189 } else { 2190 // Get the value 2191 int valLength = WritableUtils.readVInt(valLenIn); 2192 val.readFields(valIn); 2193 2194 // Read another compressed 'value' 2195 --noBufferedValues; 2196 2197 // Sanity check 2198 if ((valLength < 0) && LOG.isDebugEnabled()) { 2199 LOG.debug(val + " is a zero-length value"); 2200 } 2201 } 2202 2203 } 2204 2205 /** 2206 * Get the 'value' corresponding to the last read 'key'. 2207 * @param val : The 'value' to be read. 2208 * @throws IOException 2209 */ 2210 public synchronized Object getCurrentValue(Object val) 2211 throws IOException { 2212 if (val instanceof Configurable) { 2213 ((Configurable) val).setConf(this.conf); 2214 } 2215 2216 // Position stream to 'current' value 2217 seekToCurrentValue(); 2218 2219 if (!blockCompressed) { 2220 val = deserializeValue(val); 2221 2222 if (valIn.read() > 0) { 2223 LOG.info("available bytes: " + valIn.available()); 2224 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2225 + " bytes, should read " + 2226 (valBuffer.getLength()-keyLength)); 2227 } 2228 } else { 2229 // Get the value 2230 int valLength = WritableUtils.readVInt(valLenIn); 2231 val = deserializeValue(val); 2232 2233 // Read another compressed 'value' 2234 --noBufferedValues; 2235 2236 // Sanity check 2237 if ((valLength < 0) && LOG.isDebugEnabled()) { 2238 LOG.debug(val + " is a zero-length value"); 2239 } 2240 } 2241 return val; 2242 2243 } 2244 2245 @SuppressWarnings("unchecked") 2246 private Object deserializeValue(Object val) throws IOException { 2247 return valDeserializer.deserialize(val); 2248 } 2249 2250 /** Read the next key in the file into <code>key</code>, skipping its 2251 * value. True if another entry exists, and false at end of file. */ 2252 public synchronized boolean next(Writable key) throws IOException { 2253 if (key.getClass() != getKeyClass()) 2254 throw new IOException("wrong key class: "+key.getClass().getName() 2255 +" is not "+keyClass); 2256 2257 if (!blockCompressed) { 2258 outBuf.reset(); 2259 2260 keyLength = next(outBuf); 2261 if (keyLength < 0) 2262 return false; 2263 2264 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2265 2266 key.readFields(valBuffer); 2267 valBuffer.mark(0); 2268 if (valBuffer.getPosition() != keyLength) 2269 throw new IOException(key + " read " + valBuffer.getPosition() 2270 + " bytes, should read " + keyLength); 2271 } else { 2272 //Reset syncSeen 2273 syncSeen = false; 2274 2275 if (noBufferedKeys == 0) { 2276 try { 2277 readBlock(); 2278 } catch (EOFException eof) { 2279 return false; 2280 } 2281 } 2282 2283 int keyLength = WritableUtils.readVInt(keyLenIn); 2284 2285 // Sanity check 2286 if (keyLength < 0) { 2287 return false; 2288 } 2289 2290 //Read another compressed 'key' 2291 key.readFields(keyIn); 2292 --noBufferedKeys; 2293 } 2294 2295 return true; 2296 } 2297 2298 /** Read the next key/value pair in the file into <code>key</code> and 2299 * <code>val</code>. Returns true if such a pair exists and false when at 2300 * end of file */ 2301 public synchronized boolean next(Writable key, Writable val) 2302 throws IOException { 2303 if (val.getClass() != getValueClass()) 2304 throw new IOException("wrong value class: "+val+" is not "+valClass); 2305 2306 boolean more = next(key); 2307 2308 if (more) { 2309 getCurrentValue(val); 2310 } 2311 2312 return more; 2313 } 2314 2315 /** 2316 * Read and return the next record length, potentially skipping over 2317 * a sync block. 2318 * @return the length of the next record or -1 if there is no next record 2319 * @throws IOException 2320 */ 2321 private synchronized int readRecordLength() throws IOException { 2322 if (in.getPos() >= end) { 2323 return -1; 2324 } 2325 int length = in.readInt(); 2326 if (version > 1 && sync != null && 2327 length == SYNC_ESCAPE) { // process a sync entry 2328 in.readFully(syncCheck); // read syncCheck 2329 if (!Arrays.equals(sync, syncCheck)) // check it 2330 throw new IOException("File is corrupt!"); 2331 syncSeen = true; 2332 if (in.getPos() >= end) { 2333 return -1; 2334 } 2335 length = in.readInt(); // re-read length 2336 } else { 2337 syncSeen = false; 2338 } 2339 2340 return length; 2341 } 2342 2343 /** Read the next key/value pair in the file into <code>buffer</code>. 2344 * Returns the length of the key read, or -1 if at end of file. The length 2345 * of the value may be computed by calling buffer.getLength() before and 2346 * after calls to this method. */ 2347 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2348 @Deprecated 2349 synchronized int next(DataOutputBuffer buffer) throws IOException { 2350 // Unsupported for block-compressed sequence files 2351 if (blockCompressed) { 2352 throw new IOException("Unsupported call for block-compressed" + 2353 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2354 } 2355 try { 2356 int length = readRecordLength(); 2357 if (length == -1) { 2358 return -1; 2359 } 2360 int keyLength = in.readInt(); 2361 buffer.write(in, length); 2362 return keyLength; 2363 } catch (ChecksumException e) { // checksum failure 2364 handleChecksumException(e); 2365 return next(buffer); 2366 } 2367 } 2368 2369 public ValueBytes createValueBytes() { 2370 ValueBytes val = null; 2371 if (!decompress || blockCompressed) { 2372 val = new UncompressedBytes(); 2373 } else { 2374 val = new CompressedBytes(codec); 2375 } 2376 return val; 2377 } 2378 2379 /** 2380 * Read 'raw' records. 2381 * @param key - The buffer into which the key is read 2382 * @param val - The 'raw' value 2383 * @return Returns the total record length or -1 for end of file 2384 * @throws IOException 2385 */ 2386 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2387 throws IOException { 2388 if (!blockCompressed) { 2389 int length = readRecordLength(); 2390 if (length == -1) { 2391 return -1; 2392 } 2393 int keyLength = in.readInt(); 2394 int valLength = length - keyLength; 2395 key.write(in, keyLength); 2396 if (decompress) { 2397 CompressedBytes value = (CompressedBytes)val; 2398 value.reset(in, valLength); 2399 } else { 2400 UncompressedBytes value = (UncompressedBytes)val; 2401 value.reset(in, valLength); 2402 } 2403 2404 return length; 2405 } else { 2406 //Reset syncSeen 2407 syncSeen = false; 2408 2409 // Read 'key' 2410 if (noBufferedKeys == 0) { 2411 if (in.getPos() >= end) 2412 return -1; 2413 2414 try { 2415 readBlock(); 2416 } catch (EOFException eof) { 2417 return -1; 2418 } 2419 } 2420 int keyLength = WritableUtils.readVInt(keyLenIn); 2421 if (keyLength < 0) { 2422 throw new IOException("zero length key found!"); 2423 } 2424 key.write(keyIn, keyLength); 2425 --noBufferedKeys; 2426 2427 // Read raw 'value' 2428 seekToCurrentValue(); 2429 int valLength = WritableUtils.readVInt(valLenIn); 2430 UncompressedBytes rawValue = (UncompressedBytes)val; 2431 rawValue.reset(valIn, valLength); 2432 --noBufferedValues; 2433 2434 return (keyLength+valLength); 2435 } 2436 2437 } 2438 2439 /** 2440 * Read 'raw' keys. 2441 * @param key - The buffer into which the key is read 2442 * @return Returns the key length or -1 for end of file 2443 * @throws IOException 2444 */ 2445 public synchronized int nextRawKey(DataOutputBuffer key) 2446 throws IOException { 2447 if (!blockCompressed) { 2448 recordLength = readRecordLength(); 2449 if (recordLength == -1) { 2450 return -1; 2451 } 2452 keyLength = in.readInt(); 2453 key.write(in, keyLength); 2454 return keyLength; 2455 } else { 2456 //Reset syncSeen 2457 syncSeen = false; 2458 2459 // Read 'key' 2460 if (noBufferedKeys == 0) { 2461 if (in.getPos() >= end) 2462 return -1; 2463 2464 try { 2465 readBlock(); 2466 } catch (EOFException eof) { 2467 return -1; 2468 } 2469 } 2470 int keyLength = WritableUtils.readVInt(keyLenIn); 2471 if (keyLength < 0) { 2472 throw new IOException("zero length key found!"); 2473 } 2474 key.write(keyIn, keyLength); 2475 --noBufferedKeys; 2476 2477 return keyLength; 2478 } 2479 2480 } 2481 2482 /** Read the next key in the file, skipping its 2483 * value. Return null at end of file. */ 2484 public synchronized Object next(Object key) throws IOException { 2485 if (key != null && key.getClass() != getKeyClass()) { 2486 throw new IOException("wrong key class: "+key.getClass().getName() 2487 +" is not "+keyClass); 2488 } 2489 2490 if (!blockCompressed) { 2491 outBuf.reset(); 2492 2493 keyLength = next(outBuf); 2494 if (keyLength < 0) 2495 return null; 2496 2497 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2498 2499 key = deserializeKey(key); 2500 valBuffer.mark(0); 2501 if (valBuffer.getPosition() != keyLength) 2502 throw new IOException(key + " read " + valBuffer.getPosition() 2503 + " bytes, should read " + keyLength); 2504 } else { 2505 //Reset syncSeen 2506 syncSeen = false; 2507 2508 if (noBufferedKeys == 0) { 2509 try { 2510 readBlock(); 2511 } catch (EOFException eof) { 2512 return null; 2513 } 2514 } 2515 2516 int keyLength = WritableUtils.readVInt(keyLenIn); 2517 2518 // Sanity check 2519 if (keyLength < 0) { 2520 return null; 2521 } 2522 2523 //Read another compressed 'key' 2524 key = deserializeKey(key); 2525 --noBufferedKeys; 2526 } 2527 2528 return key; 2529 } 2530 2531 @SuppressWarnings("unchecked") 2532 private Object deserializeKey(Object key) throws IOException { 2533 return keyDeserializer.deserialize(key); 2534 } 2535 2536 /** 2537 * Read 'raw' values. 2538 * @param val - The 'raw' value 2539 * @return Returns the value length 2540 * @throws IOException 2541 */ 2542 public synchronized int nextRawValue(ValueBytes val) 2543 throws IOException { 2544 2545 // Position stream to current value 2546 seekToCurrentValue(); 2547 2548 if (!blockCompressed) { 2549 int valLength = recordLength - keyLength; 2550 if (decompress) { 2551 CompressedBytes value = (CompressedBytes)val; 2552 value.reset(in, valLength); 2553 } else { 2554 UncompressedBytes value = (UncompressedBytes)val; 2555 value.reset(in, valLength); 2556 } 2557 2558 return valLength; 2559 } else { 2560 int valLength = WritableUtils.readVInt(valLenIn); 2561 UncompressedBytes rawValue = (UncompressedBytes)val; 2562 rawValue.reset(valIn, valLength); 2563 --noBufferedValues; 2564 return valLength; 2565 } 2566 2567 } 2568 2569 private void handleChecksumException(ChecksumException e) 2570 throws IOException { 2571 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2572 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2573 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2574 } else { 2575 throw e; 2576 } 2577 } 2578 2579 /** disables sync. often invoked for tmp files */ 2580 synchronized void ignoreSync() { 2581 sync = null; 2582 } 2583 2584 /** Set the current byte position in the input file. 2585 * 2586 * <p>The position passed must be a position returned by {@link 2587 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2588 * position, use {@link SequenceFile.Reader#sync(long)}. 2589 */ 2590 public synchronized void seek(long position) throws IOException { 2591 in.seek(position); 2592 if (blockCompressed) { // trigger block read 2593 noBufferedKeys = 0; 2594 valuesDecompressed = true; 2595 } 2596 } 2597 2598 /** Seek to the next sync mark past a given position.*/ 2599 public synchronized void sync(long position) throws IOException { 2600 if (position+SYNC_SIZE >= end) { 2601 seek(end); 2602 return; 2603 } 2604 2605 if (position < headerEnd) { 2606 // seek directly to first record 2607 in.seek(headerEnd); 2608 // note the sync marker "seen" in the header 2609 syncSeen = true; 2610 return; 2611 } 2612 2613 try { 2614 seek(position+4); // skip escape 2615 in.readFully(syncCheck); 2616 int syncLen = sync.length; 2617 for (int i = 0; in.getPos() < end; i++) { 2618 int j = 0; 2619 for (; j < syncLen; j++) { 2620 if (sync[j] != syncCheck[(i+j)%syncLen]) 2621 break; 2622 } 2623 if (j == syncLen) { 2624 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2625 return; 2626 } 2627 syncCheck[i%syncLen] = in.readByte(); 2628 } 2629 } catch (ChecksumException e) { // checksum failure 2630 handleChecksumException(e); 2631 } 2632 } 2633 2634 /** Returns true iff the previous call to next passed a sync mark.*/ 2635 public synchronized boolean syncSeen() { return syncSeen; } 2636 2637 /** Return the current byte position in the input file. */ 2638 public synchronized long getPosition() throws IOException { 2639 return in.getPos(); 2640 } 2641 2642 /** Returns the name of the file. */ 2643 @Override 2644 public String toString() { 2645 return filename; 2646 } 2647 2648 } 2649 2650 /** Sorts key/value pairs in a sequence-format file. 2651 * 2652 * <p>For best performance, applications should make sure that the {@link 2653 * Writable#readFields(DataInput)} implementation of their keys is 2654 * very efficient. In particular, it should avoid allocating memory. 2655 */ 2656 public static class Sorter { 2657 2658 private RawComparator comparator; 2659 2660 private MergeSort mergeSort; //the implementation of merge sort 2661 2662 private Path[] inFiles; // when merging or sorting 2663 2664 private Path outFile; 2665 2666 private int memory; // bytes 2667 private int factor; // merged per pass 2668 2669 private FileSystem fs = null; 2670 2671 private Class keyClass; 2672 private Class valClass; 2673 2674 private Configuration conf; 2675 private Metadata metadata; 2676 2677 private Progressable progressable = null; 2678 2679 /** Sort and merge files containing the named classes. */ 2680 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2681 Class valClass, Configuration conf) { 2682 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2683 } 2684 2685 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2686 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2687 Class valClass, Configuration conf) { 2688 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2689 } 2690 2691 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2692 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2693 Class valClass, Configuration conf, Metadata metadata) { 2694 this.fs = fs; 2695 this.comparator = comparator; 2696 this.keyClass = keyClass; 2697 this.valClass = valClass; 2698 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2699 this.factor = conf.getInt("io.sort.factor", 100); 2700 this.conf = conf; 2701 this.metadata = metadata; 2702 } 2703 2704 /** Set the number of streams to merge at once.*/ 2705 public void setFactor(int factor) { this.factor = factor; } 2706 2707 /** Get the number of streams to merge at once.*/ 2708 public int getFactor() { return factor; } 2709 2710 /** Set the total amount of buffer memory, in bytes.*/ 2711 public void setMemory(int memory) { this.memory = memory; } 2712 2713 /** Get the total amount of buffer memory, in bytes.*/ 2714 public int getMemory() { return memory; } 2715 2716 /** Set the progressable object in order to report progress. */ 2717 public void setProgressable(Progressable progressable) { 2718 this.progressable = progressable; 2719 } 2720 2721 /** 2722 * Perform a file sort from a set of input files into an output file. 2723 * @param inFiles the files to be sorted 2724 * @param outFile the sorted output file 2725 * @param deleteInput should the input files be deleted as they are read? 2726 */ 2727 public void sort(Path[] inFiles, Path outFile, 2728 boolean deleteInput) throws IOException { 2729 if (fs.exists(outFile)) { 2730 throw new IOException("already exists: " + outFile); 2731 } 2732 2733 this.inFiles = inFiles; 2734 this.outFile = outFile; 2735 2736 int segments = sortPass(deleteInput); 2737 if (segments > 1) { 2738 mergePass(outFile.getParent()); 2739 } 2740 } 2741 2742 /** 2743 * Perform a file sort from a set of input files and return an iterator. 2744 * @param inFiles the files to be sorted 2745 * @param tempDir the directory where temp files are created during sort 2746 * @param deleteInput should the input files be deleted as they are read? 2747 * @return iterator the RawKeyValueIterator 2748 */ 2749 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2750 boolean deleteInput) throws IOException { 2751 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2752 if (fs.exists(outFile)) { 2753 throw new IOException("already exists: " + outFile); 2754 } 2755 this.inFiles = inFiles; 2756 //outFile will basically be used as prefix for temp files in the cases 2757 //where sort outputs multiple sorted segments. For the single segment 2758 //case, the outputFile itself will contain the sorted data for that 2759 //segment 2760 this.outFile = outFile; 2761 2762 int segments = sortPass(deleteInput); 2763 if (segments > 1) 2764 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2765 tempDir); 2766 else if (segments == 1) 2767 return merge(new Path[]{outFile}, true, tempDir); 2768 else return null; 2769 } 2770 2771 /** 2772 * The backwards compatible interface to sort. 2773 * @param inFile the input file to sort 2774 * @param outFile the sorted output file 2775 */ 2776 public void sort(Path inFile, Path outFile) throws IOException { 2777 sort(new Path[]{inFile}, outFile, false); 2778 } 2779 2780 private int sortPass(boolean deleteInput) throws IOException { 2781 if(LOG.isDebugEnabled()) { 2782 LOG.debug("running sort pass"); 2783 } 2784 SortPass sortPass = new SortPass(); // make the SortPass 2785 sortPass.setProgressable(progressable); 2786 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2787 try { 2788 return sortPass.run(deleteInput); // run it 2789 } finally { 2790 sortPass.close(); // close it 2791 } 2792 } 2793 2794 private class SortPass { 2795 private int memoryLimit = memory/4; 2796 private int recordLimit = 1000000; 2797 2798 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2799 private byte[] rawBuffer; 2800 2801 private int[] keyOffsets = new int[1024]; 2802 private int[] pointers = new int[keyOffsets.length]; 2803 private int[] pointersCopy = new int[keyOffsets.length]; 2804 private int[] keyLengths = new int[keyOffsets.length]; 2805 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2806 2807 private ArrayList segmentLengths = new ArrayList(); 2808 2809 private Reader in = null; 2810 private FSDataOutputStream out = null; 2811 private FSDataOutputStream indexOut = null; 2812 private Path outName; 2813 2814 private Progressable progressable = null; 2815 2816 public int run(boolean deleteInput) throws IOException { 2817 int segments = 0; 2818 int currentFile = 0; 2819 boolean atEof = (currentFile >= inFiles.length); 2820 CompressionType compressionType; 2821 CompressionCodec codec = null; 2822 segmentLengths.clear(); 2823 if (atEof) { 2824 return 0; 2825 } 2826 2827 // Initialize 2828 in = new Reader(fs, inFiles[currentFile], conf); 2829 compressionType = in.getCompressionType(); 2830 codec = in.getCompressionCodec(); 2831 2832 for (int i=0; i < rawValues.length; ++i) { 2833 rawValues[i] = null; 2834 } 2835 2836 while (!atEof) { 2837 int count = 0; 2838 int bytesProcessed = 0; 2839 rawKeys.reset(); 2840 while (!atEof && 2841 bytesProcessed < memoryLimit && count < recordLimit) { 2842 2843 // Read a record into buffer 2844 // Note: Attempt to re-use 'rawValue' as far as possible 2845 int keyOffset = rawKeys.getLength(); 2846 ValueBytes rawValue = 2847 (count == keyOffsets.length || rawValues[count] == null) ? 2848 in.createValueBytes() : 2849 rawValues[count]; 2850 int recordLength = in.nextRaw(rawKeys, rawValue); 2851 if (recordLength == -1) { 2852 in.close(); 2853 if (deleteInput) { 2854 fs.delete(inFiles[currentFile], true); 2855 } 2856 currentFile += 1; 2857 atEof = currentFile >= inFiles.length; 2858 if (!atEof) { 2859 in = new Reader(fs, inFiles[currentFile], conf); 2860 } else { 2861 in = null; 2862 } 2863 continue; 2864 } 2865 2866 int keyLength = rawKeys.getLength() - keyOffset; 2867 2868 if (count == keyOffsets.length) 2869 grow(); 2870 2871 keyOffsets[count] = keyOffset; // update pointers 2872 pointers[count] = count; 2873 keyLengths[count] = keyLength; 2874 rawValues[count] = rawValue; 2875 2876 bytesProcessed += recordLength; 2877 count++; 2878 } 2879 2880 // buffer is full -- sort & flush it 2881 if(LOG.isDebugEnabled()) { 2882 LOG.debug("flushing segment " + segments); 2883 } 2884 rawBuffer = rawKeys.getData(); 2885 sort(count); 2886 // indicate we're making progress 2887 if (progressable != null) { 2888 progressable.progress(); 2889 } 2890 flush(count, bytesProcessed, compressionType, codec, 2891 segments==0 && atEof); 2892 segments++; 2893 } 2894 return segments; 2895 } 2896 2897 public void close() throws IOException { 2898 if (in != null) { 2899 in.close(); 2900 } 2901 if (out != null) { 2902 out.close(); 2903 } 2904 if (indexOut != null) { 2905 indexOut.close(); 2906 } 2907 } 2908 2909 private void grow() { 2910 int newLength = keyOffsets.length * 3 / 2; 2911 keyOffsets = grow(keyOffsets, newLength); 2912 pointers = grow(pointers, newLength); 2913 pointersCopy = new int[newLength]; 2914 keyLengths = grow(keyLengths, newLength); 2915 rawValues = grow(rawValues, newLength); 2916 } 2917 2918 private int[] grow(int[] old, int newLength) { 2919 int[] result = new int[newLength]; 2920 System.arraycopy(old, 0, result, 0, old.length); 2921 return result; 2922 } 2923 2924 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2925 ValueBytes[] result = new ValueBytes[newLength]; 2926 System.arraycopy(old, 0, result, 0, old.length); 2927 for (int i=old.length; i < newLength; ++i) { 2928 result[i] = null; 2929 } 2930 return result; 2931 } 2932 2933 private void flush(int count, int bytesProcessed, 2934 CompressionType compressionType, 2935 CompressionCodec codec, 2936 boolean done) throws IOException { 2937 if (out == null) { 2938 outName = done ? outFile : outFile.suffix(".0"); 2939 out = fs.create(outName); 2940 if (!done) { 2941 indexOut = fs.create(outName.suffix(".index")); 2942 } 2943 } 2944 2945 long segmentStart = out.getPos(); 2946 Writer writer = createWriter(conf, Writer.stream(out), 2947 Writer.keyClass(keyClass), Writer.valueClass(valClass), 2948 Writer.compression(compressionType, codec), 2949 Writer.metadata(done ? metadata : new Metadata())); 2950 2951 if (!done) { 2952 writer.sync = null; // disable sync on temp files 2953 } 2954 2955 for (int i = 0; i < count; i++) { // write in sorted order 2956 int p = pointers[i]; 2957 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 2958 } 2959 writer.close(); 2960 2961 if (!done) { 2962 // Save the segment length 2963 WritableUtils.writeVLong(indexOut, segmentStart); 2964 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 2965 indexOut.flush(); 2966 } 2967 } 2968 2969 private void sort(int count) { 2970 System.arraycopy(pointers, 0, pointersCopy, 0, count); 2971 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 2972 } 2973 class SeqFileComparator implements Comparator<IntWritable> { 2974 @Override 2975 public int compare(IntWritable I, IntWritable J) { 2976 return comparator.compare(rawBuffer, keyOffsets[I.get()], 2977 keyLengths[I.get()], rawBuffer, 2978 keyOffsets[J.get()], keyLengths[J.get()]); 2979 } 2980 } 2981 2982 /** set the progressable object in order to report progress */ 2983 public void setProgressable(Progressable progressable) 2984 { 2985 this.progressable = progressable; 2986 } 2987 2988 } // SequenceFile.Sorter.SortPass 2989 2990 /** The interface to iterate over raw keys/values of SequenceFiles. */ 2991 public static interface RawKeyValueIterator { 2992 /** Gets the current raw key 2993 * @return DataOutputBuffer 2994 * @throws IOException 2995 */ 2996 DataOutputBuffer getKey() throws IOException; 2997 /** Gets the current raw value 2998 * @return ValueBytes 2999 * @throws IOException 3000 */ 3001 ValueBytes getValue() throws IOException; 3002 /** Sets up the current key and value (for getKey and getValue) 3003 * @return true if there exists a key/value, false otherwise 3004 * @throws IOException 3005 */ 3006 boolean next() throws IOException; 3007 /** closes the iterator so that the underlying streams can be closed 3008 * @throws IOException 3009 */ 3010 void close() throws IOException; 3011 /** Gets the Progress object; this has a float (0.0 - 1.0) 3012 * indicating the bytes processed by the iterator so far 3013 */ 3014 Progress getProgress(); 3015 } 3016 3017 /** 3018 * Merges the list of segments of type <code>SegmentDescriptor</code> 3019 * @param segments the list of SegmentDescriptors 3020 * @param tmpDir the directory to write temporary files into 3021 * @return RawKeyValueIterator 3022 * @throws IOException 3023 */ 3024 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3025 Path tmpDir) 3026 throws IOException { 3027 // pass in object to report progress, if present 3028 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3029 return mQueue.merge(); 3030 } 3031 3032 /** 3033 * Merges the contents of files passed in Path[] using a max factor value 3034 * that is already set 3035 * @param inNames the array of path names 3036 * @param deleteInputs true if the input files should be deleted when 3037 * unnecessary 3038 * @param tmpDir the directory to write temporary files into 3039 * @return RawKeyValueIteratorMergeQueue 3040 * @throws IOException 3041 */ 3042 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3043 Path tmpDir) 3044 throws IOException { 3045 return merge(inNames, deleteInputs, 3046 (inNames.length < factor) ? inNames.length : factor, 3047 tmpDir); 3048 } 3049 3050 /** 3051 * Merges the contents of files passed in Path[] 3052 * @param inNames the array of path names 3053 * @param deleteInputs true if the input files should be deleted when 3054 * unnecessary 3055 * @param factor the factor that will be used as the maximum merge fan-in 3056 * @param tmpDir the directory to write temporary files into 3057 * @return RawKeyValueIteratorMergeQueue 3058 * @throws IOException 3059 */ 3060 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3061 int factor, Path tmpDir) 3062 throws IOException { 3063 //get the segments from inNames 3064 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3065 for (int i = 0; i < inNames.length; i++) { 3066 SegmentDescriptor s = new SegmentDescriptor(0, 3067 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3068 s.preserveInput(!deleteInputs); 3069 s.doSync(); 3070 a.add(s); 3071 } 3072 this.factor = factor; 3073 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3074 return mQueue.merge(); 3075 } 3076 3077 /** 3078 * Merges the contents of files passed in Path[] 3079 * @param inNames the array of path names 3080 * @param tempDir the directory for creating temp files during merge 3081 * @param deleteInputs true if the input files should be deleted when 3082 * unnecessary 3083 * @return RawKeyValueIteratorMergeQueue 3084 * @throws IOException 3085 */ 3086 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3087 boolean deleteInputs) 3088 throws IOException { 3089 //outFile will basically be used as prefix for temp files for the 3090 //intermediate merge outputs 3091 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3092 //get the segments from inNames 3093 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3094 for (int i = 0; i < inNames.length; i++) { 3095 SegmentDescriptor s = new SegmentDescriptor(0, 3096 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3097 s.preserveInput(!deleteInputs); 3098 s.doSync(); 3099 a.add(s); 3100 } 3101 factor = (inNames.length < factor) ? inNames.length : factor; 3102 // pass in object to report progress, if present 3103 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3104 return mQueue.merge(); 3105 } 3106 3107 /** 3108 * Clones the attributes (like compression of the input file and creates a 3109 * corresponding Writer 3110 * @param inputFile the path of the input file whose attributes should be 3111 * cloned 3112 * @param outputFile the path of the output file 3113 * @param prog the Progressable to report status during the file write 3114 * @return Writer 3115 * @throws IOException 3116 */ 3117 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3118 Progressable prog) throws IOException { 3119 Reader reader = new Reader(conf, 3120 Reader.file(inputFile), 3121 new Reader.OnlyHeaderOption()); 3122 CompressionType compress = reader.getCompressionType(); 3123 CompressionCodec codec = reader.getCompressionCodec(); 3124 reader.close(); 3125 3126 Writer writer = createWriter(conf, 3127 Writer.file(outputFile), 3128 Writer.keyClass(keyClass), 3129 Writer.valueClass(valClass), 3130 Writer.compression(compress, codec), 3131 Writer.progressable(prog)); 3132 return writer; 3133 } 3134 3135 /** 3136 * Writes records from RawKeyValueIterator into a file represented by the 3137 * passed writer 3138 * @param records the RawKeyValueIterator 3139 * @param writer the Writer created earlier 3140 * @throws IOException 3141 */ 3142 public void writeFile(RawKeyValueIterator records, Writer writer) 3143 throws IOException { 3144 while(records.next()) { 3145 writer.appendRaw(records.getKey().getData(), 0, 3146 records.getKey().getLength(), records.getValue()); 3147 } 3148 writer.sync(); 3149 } 3150 3151 /** Merge the provided files. 3152 * @param inFiles the array of input path names 3153 * @param outFile the final output file 3154 * @throws IOException 3155 */ 3156 public void merge(Path[] inFiles, Path outFile) throws IOException { 3157 if (fs.exists(outFile)) { 3158 throw new IOException("already exists: " + outFile); 3159 } 3160 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3161 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3162 3163 writeFile(r, writer); 3164 3165 writer.close(); 3166 } 3167 3168 /** sort calls this to generate the final merged output */ 3169 private int mergePass(Path tmpDir) throws IOException { 3170 if(LOG.isDebugEnabled()) { 3171 LOG.debug("running merge pass"); 3172 } 3173 Writer writer = cloneFileAttributes( 3174 outFile.suffix(".0"), outFile, null); 3175 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3176 outFile.suffix(".0.index"), tmpDir); 3177 writeFile(r, writer); 3178 3179 writer.close(); 3180 return 0; 3181 } 3182 3183 /** Used by mergePass to merge the output of the sort 3184 * @param inName the name of the input file containing sorted segments 3185 * @param indexIn the offsets of the sorted segments 3186 * @param tmpDir the relative directory to store intermediate results in 3187 * @return RawKeyValueIterator 3188 * @throws IOException 3189 */ 3190 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3191 throws IOException { 3192 //get the segments from indexIn 3193 //we create a SegmentContainer so that we can track segments belonging to 3194 //inName and delete inName as soon as we see that we have looked at all 3195 //the contained segments during the merge process & hence don't need 3196 //them anymore 3197 SegmentContainer container = new SegmentContainer(inName, indexIn); 3198 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3199 return mQueue.merge(); 3200 } 3201 3202 /** This class implements the core of the merge logic */ 3203 private class MergeQueue extends PriorityQueue 3204 implements RawKeyValueIterator { 3205 private boolean compress; 3206 private boolean blockCompress; 3207 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3208 private ValueBytes rawValue; 3209 private long totalBytesProcessed; 3210 private float progPerByte; 3211 private Progress mergeProgress = new Progress(); 3212 private Path tmpDir; 3213 private Progressable progress = null; //handle to the progress reporting object 3214 private SegmentDescriptor minSegment; 3215 3216 //a TreeMap used to store the segments sorted by size (segment offset and 3217 //segment path name is used to break ties between segments of same sizes) 3218 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3219 new TreeMap<SegmentDescriptor, Void>(); 3220 3221 @SuppressWarnings("unchecked") 3222 public void put(SegmentDescriptor stream) throws IOException { 3223 if (size() == 0) { 3224 compress = stream.in.isCompressed(); 3225 blockCompress = stream.in.isBlockCompressed(); 3226 } else if (compress != stream.in.isCompressed() || 3227 blockCompress != stream.in.isBlockCompressed()) { 3228 throw new IOException("All merged files must be compressed or not."); 3229 } 3230 super.put(stream); 3231 } 3232 3233 /** 3234 * A queue of file segments to merge 3235 * @param segments the file segments to merge 3236 * @param tmpDir a relative local directory to save intermediate files in 3237 * @param progress the reference to the Progressable object 3238 */ 3239 public MergeQueue(List <SegmentDescriptor> segments, 3240 Path tmpDir, Progressable progress) { 3241 int size = segments.size(); 3242 for (int i = 0; i < size; i++) { 3243 sortedSegmentSizes.put(segments.get(i), null); 3244 } 3245 this.tmpDir = tmpDir; 3246 this.progress = progress; 3247 } 3248 @Override 3249 protected boolean lessThan(Object a, Object b) { 3250 // indicate we're making progress 3251 if (progress != null) { 3252 progress.progress(); 3253 } 3254 SegmentDescriptor msa = (SegmentDescriptor)a; 3255 SegmentDescriptor msb = (SegmentDescriptor)b; 3256 return comparator.compare(msa.getKey().getData(), 0, 3257 msa.getKey().getLength(), msb.getKey().getData(), 0, 3258 msb.getKey().getLength()) < 0; 3259 } 3260 @Override 3261 public void close() throws IOException { 3262 SegmentDescriptor ms; // close inputs 3263 while ((ms = (SegmentDescriptor)pop()) != null) { 3264 ms.cleanup(); 3265 } 3266 minSegment = null; 3267 } 3268 @Override 3269 public DataOutputBuffer getKey() throws IOException { 3270 return rawKey; 3271 } 3272 @Override 3273 public ValueBytes getValue() throws IOException { 3274 return rawValue; 3275 } 3276 @Override 3277 public boolean next() throws IOException { 3278 if (size() == 0) 3279 return false; 3280 if (minSegment != null) { 3281 //minSegment is non-null for all invocations of next except the first 3282 //one. For the first invocation, the priority queue is ready for use 3283 //but for the subsequent invocations, first adjust the queue 3284 adjustPriorityQueue(minSegment); 3285 if (size() == 0) { 3286 minSegment = null; 3287 return false; 3288 } 3289 } 3290 minSegment = (SegmentDescriptor)top(); 3291 long startPos = minSegment.in.getPosition(); // Current position in stream 3292 //save the raw key reference 3293 rawKey = minSegment.getKey(); 3294 //load the raw value. Re-use the existing rawValue buffer 3295 if (rawValue == null) { 3296 rawValue = minSegment.in.createValueBytes(); 3297 } 3298 minSegment.nextRawValue(rawValue); 3299 long endPos = minSegment.in.getPosition(); // End position after reading value 3300 updateProgress(endPos - startPos); 3301 return true; 3302 } 3303 3304 @Override 3305 public Progress getProgress() { 3306 return mergeProgress; 3307 } 3308 3309 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3310 long startPos = ms.in.getPosition(); // Current position in stream 3311 boolean hasNext = ms.nextRawKey(); 3312 long endPos = ms.in.getPosition(); // End position after reading key 3313 updateProgress(endPos - startPos); 3314 if (hasNext) { 3315 adjustTop(); 3316 } else { 3317 pop(); 3318 ms.cleanup(); 3319 } 3320 } 3321 3322 private void updateProgress(long bytesProcessed) { 3323 totalBytesProcessed += bytesProcessed; 3324 if (progPerByte > 0) { 3325 mergeProgress.set(totalBytesProcessed * progPerByte); 3326 } 3327 } 3328 3329 /** This is the single level merge that is called multiple times 3330 * depending on the factor size and the number of segments 3331 * @return RawKeyValueIterator 3332 * @throws IOException 3333 */ 3334 public RawKeyValueIterator merge() throws IOException { 3335 //create the MergeStreams from the sorted map created in the constructor 3336 //and dump the final output to a file 3337 int numSegments = sortedSegmentSizes.size(); 3338 int origFactor = factor; 3339 int passNo = 1; 3340 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3341 do { 3342 //get the factor for this pass of merge 3343 factor = getPassFactor(passNo, numSegments); 3344 List<SegmentDescriptor> segmentsToMerge = 3345 new ArrayList<SegmentDescriptor>(); 3346 int segmentsConsidered = 0; 3347 int numSegmentsToConsider = factor; 3348 while (true) { 3349 //extract the smallest 'factor' number of segment pointers from the 3350 //TreeMap. Call cleanup on the empty segments (no key/value data) 3351 SegmentDescriptor[] mStream = 3352 getSegmentDescriptors(numSegmentsToConsider); 3353 for (int i = 0; i < mStream.length; i++) { 3354 if (mStream[i].nextRawKey()) { 3355 segmentsToMerge.add(mStream[i]); 3356 segmentsConsidered++; 3357 // Count the fact that we read some bytes in calling nextRawKey() 3358 updateProgress(mStream[i].in.getPosition()); 3359 } 3360 else { 3361 mStream[i].cleanup(); 3362 numSegments--; //we ignore this segment for the merge 3363 } 3364 } 3365 //if we have the desired number of segments 3366 //or looked at all available segments, we break 3367 if (segmentsConsidered == factor || 3368 sortedSegmentSizes.size() == 0) { 3369 break; 3370 } 3371 3372 numSegmentsToConsider = factor - segmentsConsidered; 3373 } 3374 //feed the streams to the priority queue 3375 initialize(segmentsToMerge.size()); clear(); 3376 for (int i = 0; i < segmentsToMerge.size(); i++) { 3377 put(segmentsToMerge.get(i)); 3378 } 3379 //if we have lesser number of segments remaining, then just return the 3380 //iterator, else do another single level merge 3381 if (numSegments <= factor) { 3382 //calculate the length of the remaining segments. Required for 3383 //calculating the merge progress 3384 long totalBytes = 0; 3385 for (int i = 0; i < segmentsToMerge.size(); i++) { 3386 totalBytes += segmentsToMerge.get(i).segmentLength; 3387 } 3388 if (totalBytes != 0) //being paranoid 3389 progPerByte = 1.0f / (float)totalBytes; 3390 //reset factor to what it originally was 3391 factor = origFactor; 3392 return this; 3393 } else { 3394 //we want to spread the creation of temp files on multiple disks if 3395 //available under the space constraints 3396 long approxOutputSize = 0; 3397 for (SegmentDescriptor s : segmentsToMerge) { 3398 approxOutputSize += s.segmentLength + 3399 ChecksumFileSystem.getApproxChkSumLength( 3400 s.segmentLength); 3401 } 3402 Path tmpFilename = 3403 new Path(tmpDir, "intermediate").suffix("." + passNo); 3404 3405 Path outputFile = lDirAlloc.getLocalPathForWrite( 3406 tmpFilename.toString(), 3407 approxOutputSize, conf); 3408 if(LOG.isDebugEnabled()) { 3409 LOG.debug("writing intermediate results to " + outputFile); 3410 } 3411 Writer writer = cloneFileAttributes( 3412 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3413 fs.makeQualified(outputFile), null); 3414 writer.sync = null; //disable sync for temp files 3415 writeFile(this, writer); 3416 writer.close(); 3417 3418 //we finished one single level merge; now clean up the priority 3419 //queue 3420 this.close(); 3421 3422 SegmentDescriptor tempSegment = 3423 new SegmentDescriptor(0, 3424 fs.getFileStatus(outputFile).getLen(), outputFile); 3425 //put the segment back in the TreeMap 3426 sortedSegmentSizes.put(tempSegment, null); 3427 numSegments = sortedSegmentSizes.size(); 3428 passNo++; 3429 } 3430 //we are worried about only the first pass merge factor. So reset the 3431 //factor to what it originally was 3432 factor = origFactor; 3433 } while(true); 3434 } 3435 3436 //Hadoop-591 3437 public int getPassFactor(int passNo, int numSegments) { 3438 if (passNo > 1 || numSegments <= factor || factor == 1) 3439 return factor; 3440 int mod = (numSegments - 1) % (factor - 1); 3441 if (mod == 0) 3442 return factor; 3443 return mod + 1; 3444 } 3445 3446 /** Return (& remove) the requested number of segment descriptors from the 3447 * sorted map. 3448 */ 3449 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3450 if (numDescriptors > sortedSegmentSizes.size()) 3451 numDescriptors = sortedSegmentSizes.size(); 3452 SegmentDescriptor[] SegmentDescriptors = 3453 new SegmentDescriptor[numDescriptors]; 3454 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3455 int i = 0; 3456 while (i < numDescriptors) { 3457 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3458 iter.remove(); 3459 } 3460 return SegmentDescriptors; 3461 } 3462 } // SequenceFile.Sorter.MergeQueue 3463 3464 /** This class defines a merge segment. This class can be subclassed to 3465 * provide a customized cleanup method implementation. In this 3466 * implementation, cleanup closes the file handle and deletes the file 3467 */ 3468 public class SegmentDescriptor implements Comparable { 3469 3470 long segmentOffset; //the start of the segment in the file 3471 long segmentLength; //the length of the segment 3472 Path segmentPathName; //the path name of the file containing the segment 3473 boolean ignoreSync = true; //set to true for temp files 3474 private Reader in = null; 3475 private DataOutputBuffer rawKey = null; //this will hold the current key 3476 private boolean preserveInput = false; //delete input segment files? 3477 3478 /** Constructs a segment 3479 * @param segmentOffset the offset of the segment in the file 3480 * @param segmentLength the length of the segment 3481 * @param segmentPathName the path name of the file containing the segment 3482 */ 3483 public SegmentDescriptor (long segmentOffset, long segmentLength, 3484 Path segmentPathName) { 3485 this.segmentOffset = segmentOffset; 3486 this.segmentLength = segmentLength; 3487 this.segmentPathName = segmentPathName; 3488 } 3489 3490 /** Do the sync checks */ 3491 public void doSync() {ignoreSync = false;} 3492 3493 /** Whether to delete the files when no longer needed */ 3494 public void preserveInput(boolean preserve) { 3495 preserveInput = preserve; 3496 } 3497 3498 public boolean shouldPreserveInput() { 3499 return preserveInput; 3500 } 3501 3502 @Override 3503 public int compareTo(Object o) { 3504 SegmentDescriptor that = (SegmentDescriptor)o; 3505 if (this.segmentLength != that.segmentLength) { 3506 return (this.segmentLength < that.segmentLength ? -1 : 1); 3507 } 3508 if (this.segmentOffset != that.segmentOffset) { 3509 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3510 } 3511 return (this.segmentPathName.toString()). 3512 compareTo(that.segmentPathName.toString()); 3513 } 3514 3515 @Override 3516 public boolean equals(Object o) { 3517 if (!(o instanceof SegmentDescriptor)) { 3518 return false; 3519 } 3520 SegmentDescriptor that = (SegmentDescriptor)o; 3521 if (this.segmentLength == that.segmentLength && 3522 this.segmentOffset == that.segmentOffset && 3523 this.segmentPathName.toString().equals( 3524 that.segmentPathName.toString())) { 3525 return true; 3526 } 3527 return false; 3528 } 3529 3530 @Override 3531 public int hashCode() { 3532 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3533 } 3534 3535 /** Fills up the rawKey object with the key returned by the Reader 3536 * @return true if there is a key returned; false, otherwise 3537 * @throws IOException 3538 */ 3539 public boolean nextRawKey() throws IOException { 3540 if (in == null) { 3541 int bufferSize = getBufferSize(conf); 3542 Reader reader = new Reader(conf, 3543 Reader.file(segmentPathName), 3544 Reader.bufferSize(bufferSize), 3545 Reader.start(segmentOffset), 3546 Reader.length(segmentLength)); 3547 3548 //sometimes we ignore syncs especially for temp merge files 3549 if (ignoreSync) reader.ignoreSync(); 3550 3551 if (reader.getKeyClass() != keyClass) 3552 throw new IOException("wrong key class: " + reader.getKeyClass() + 3553 " is not " + keyClass); 3554 if (reader.getValueClass() != valClass) 3555 throw new IOException("wrong value class: "+reader.getValueClass()+ 3556 " is not " + valClass); 3557 this.in = reader; 3558 rawKey = new DataOutputBuffer(); 3559 } 3560 rawKey.reset(); 3561 int keyLength = 3562 in.nextRawKey(rawKey); 3563 return (keyLength >= 0); 3564 } 3565 3566 /** Fills up the passed rawValue with the value corresponding to the key 3567 * read earlier 3568 * @param rawValue 3569 * @return the length of the value 3570 * @throws IOException 3571 */ 3572 public int nextRawValue(ValueBytes rawValue) throws IOException { 3573 int valLength = in.nextRawValue(rawValue); 3574 return valLength; 3575 } 3576 3577 /** Returns the stored rawKey */ 3578 public DataOutputBuffer getKey() { 3579 return rawKey; 3580 } 3581 3582 /** closes the underlying reader */ 3583 private void close() throws IOException { 3584 this.in.close(); 3585 this.in = null; 3586 } 3587 3588 /** The default cleanup. Subclasses can override this with a custom 3589 * cleanup 3590 */ 3591 public void cleanup() throws IOException { 3592 close(); 3593 if (!preserveInput) { 3594 fs.delete(segmentPathName, true); 3595 } 3596 } 3597 } // SequenceFile.Sorter.SegmentDescriptor 3598 3599 /** This class provisions multiple segments contained within a single 3600 * file 3601 */ 3602 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3603 3604 SegmentContainer parentContainer = null; 3605 3606 /** Constructs a segment 3607 * @param segmentOffset the offset of the segment in the file 3608 * @param segmentLength the length of the segment 3609 * @param segmentPathName the path name of the file containing the segment 3610 * @param parent the parent SegmentContainer that holds the segment 3611 */ 3612 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3613 Path segmentPathName, SegmentContainer parent) { 3614 super(segmentOffset, segmentLength, segmentPathName); 3615 this.parentContainer = parent; 3616 } 3617 /** The default cleanup. Subclasses can override this with a custom 3618 * cleanup 3619 */ 3620 @Override 3621 public void cleanup() throws IOException { 3622 super.close(); 3623 if (super.shouldPreserveInput()) return; 3624 parentContainer.cleanup(); 3625 } 3626 3627 @Override 3628 public boolean equals(Object o) { 3629 if (!(o instanceof LinkedSegmentsDescriptor)) { 3630 return false; 3631 } 3632 return super.equals(o); 3633 } 3634 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3635 3636 /** The class that defines a container for segments to be merged. Primarily 3637 * required to delete temp files as soon as all the contained segments 3638 * have been looked at */ 3639 private class SegmentContainer { 3640 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3641 private int numSegmentsContained; //# of segments contained 3642 private Path inName; //input file from where segments are created 3643 3644 //the list of segments read from the file 3645 private ArrayList <SegmentDescriptor> segments = 3646 new ArrayList <SegmentDescriptor>(); 3647 /** This constructor is there primarily to serve the sort routine that 3648 * generates a single output file with an associated index file */ 3649 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3650 //get the segments from indexIn 3651 FSDataInputStream fsIndexIn = fs.open(indexIn); 3652 long end = fs.getFileStatus(indexIn).getLen(); 3653 while (fsIndexIn.getPos() < end) { 3654 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3655 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3656 Path segmentName = inName; 3657 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3658 segmentLength, segmentName, this)); 3659 } 3660 fsIndexIn.close(); 3661 fs.delete(indexIn, true); 3662 numSegmentsContained = segments.size(); 3663 this.inName = inName; 3664 } 3665 3666 public List <SegmentDescriptor> getSegmentList() { 3667 return segments; 3668 } 3669 public void cleanup() throws IOException { 3670 numSegmentsCleanedUp++; 3671 if (numSegmentsCleanedUp == numSegmentsContained) { 3672 fs.delete(inName, true); 3673 } 3674 } 3675 } //SequenceFile.Sorter.SegmentContainer 3676 3677 } // SequenceFile.Sorter 3678 3679} // SequenceFile