001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.*; 022 import java.util.*; 023 import java.rmi.server.UID; 024 import java.security.MessageDigest; 025 import org.apache.commons.logging.*; 026 import org.apache.hadoop.util.Options; 027 import org.apache.hadoop.fs.*; 028 import org.apache.hadoop.fs.Options.CreateOpts; 029 import org.apache.hadoop.io.compress.CodecPool; 030 import org.apache.hadoop.io.compress.CompressionCodec; 031 import org.apache.hadoop.io.compress.CompressionInputStream; 032 import org.apache.hadoop.io.compress.CompressionOutputStream; 033 import org.apache.hadoop.io.compress.Compressor; 034 import org.apache.hadoop.io.compress.Decompressor; 035 import org.apache.hadoop.io.compress.DefaultCodec; 036 import org.apache.hadoop.io.compress.GzipCodec; 037 import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038 import org.apache.hadoop.io.serializer.Deserializer; 039 import org.apache.hadoop.io.serializer.Serializer; 040 import org.apache.hadoop.io.serializer.SerializationFactory; 041 import org.apache.hadoop.classification.InterfaceAudience; 042 import org.apache.hadoop.classification.InterfaceStability; 043 import org.apache.hadoop.conf.*; 044 import org.apache.hadoop.util.Progressable; 045 import org.apache.hadoop.util.Progress; 046 import org.apache.hadoop.util.ReflectionUtils; 047 import org.apache.hadoop.util.NativeCodeLoader; 048 import org.apache.hadoop.util.MergeSort; 049 import org.apache.hadoop.util.PriorityQueue; 050 import org.apache.hadoop.util.Time; 051 052 /** 053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 054 * pairs. 055 * 056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 058 * reading and sorting respectively.</p> 059 * 060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 061 * {@link CompressionType} used to compress key/value pairs: 062 * <ol> 063 * <li> 064 * <code>Writer</code> : Uncompressed records. 065 * </li> 066 * <li> 067 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 068 * values. 069 * </li> 070 * <li> 071 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 072 * values are collected in 'blocks' 073 * separately and compressed. The size of 074 * the 'block' is configurable. 075 * </ol> 076 * 077 * <p>The actual compression algorithm used to compress key and/or values can be 078 * specified by using the appropriate {@link CompressionCodec}.</p> 079 * 080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 082 * 083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 084 * above <code>SequenceFile</code> formats.</p> 085 * 086 * <h4 id="Formats">SequenceFile Formats</h4> 087 * 088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 089 * depending on the <code>CompressionType</code> specified. All of them share a 090 * <a href="#Header">common header</a> described below. 091 * 092 * <h5 id="Header">SequenceFile Header</h5> 093 * <ul> 094 * <li> 095 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 096 * version number (e.g. SEQ4 or SEQ6) 097 * </li> 098 * <li> 099 * keyClassName -key class 100 * </li> 101 * <li> 102 * valueClassName - value class 103 * </li> 104 * <li> 105 * compression - A boolean which specifies if compression is turned on for 106 * keys/values in this file. 107 * </li> 108 * <li> 109 * blockCompression - A boolean which specifies if block-compression is 110 * turned on for keys/values in this file. 111 * </li> 112 * <li> 113 * compression codec - <code>CompressionCodec</code> class which is used for 114 * compression of keys and/or values (if compression is 115 * enabled). 116 * </li> 117 * <li> 118 * metadata - {@link Metadata} for this file. 119 * </li> 120 * <li> 121 * sync - A sync marker to denote end of the header. 122 * </li> 123 * </ul> 124 * 125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 126 * <ul> 127 * <li> 128 * <a href="#Header">Header</a> 129 * </li> 130 * <li> 131 * Record 132 * <ul> 133 * <li>Record length</li> 134 * <li>Key length</li> 135 * <li>Key</li> 136 * <li>Value</li> 137 * </ul> 138 * </li> 139 * <li> 140 * A sync-marker every few <code>100</code> bytes or so. 141 * </li> 142 * </ul> 143 * 144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 145 * <ul> 146 * <li> 147 * <a href="#Header">Header</a> 148 * </li> 149 * <li> 150 * Record 151 * <ul> 152 * <li>Record length</li> 153 * <li>Key length</li> 154 * <li>Key</li> 155 * <li><i>Compressed</i> Value</li> 156 * </ul> 157 * </li> 158 * <li> 159 * A sync-marker every few <code>100</code> bytes or so. 160 * </li> 161 * </ul> 162 * 163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 164 * <ul> 165 * <li> 166 * <a href="#Header">Header</a> 167 * </li> 168 * <li> 169 * Record <i>Block</i> 170 * <ul> 171 * <li>Uncompressed number of records in the block</li> 172 * <li>Compressed key-lengths block-size</li> 173 * <li>Compressed key-lengths block</li> 174 * <li>Compressed keys block-size</li> 175 * <li>Compressed keys block</li> 176 * <li>Compressed value-lengths block-size</li> 177 * <li>Compressed value-lengths block</li> 178 * <li>Compressed values block-size</li> 179 * <li>Compressed values block</li> 180 * </ul> 181 * </li> 182 * <li> 183 * A sync-marker every block. 184 * </li> 185 * </ul> 186 * 187 * <p>The compressed blocks of key lengths and value lengths consist of the 188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 189 * format.</p> 190 * 191 * @see CompressionCodec 192 */ 193 @InterfaceAudience.Public 194 @InterfaceStability.Stable 195 public class SequenceFile { 196 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 197 198 private SequenceFile() {} // no public ctor 199 200 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 201 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 202 private static final byte VERSION_WITH_METADATA = (byte)6; 203 private static byte[] VERSION = new byte[] { 204 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 205 }; 206 207 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 208 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 209 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 210 211 /** The number of bytes between sync points.*/ 212 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 213 214 /** 215 * The compression type used to compress key/value pairs in the 216 * {@link SequenceFile}. 217 * 218 * @see SequenceFile.Writer 219 */ 220 public static enum CompressionType { 221 /** Do not compress records. */ 222 NONE, 223 /** Compress values only, each separately. */ 224 RECORD, 225 /** Compress sequences of records together in blocks. */ 226 BLOCK 227 } 228 229 /** 230 * Get the compression type for the reduce outputs 231 * @param job the job config to look in 232 * @return the kind of compression to use 233 */ 234 static public CompressionType getDefaultCompressionType(Configuration job) { 235 String name = job.get("io.seqfile.compression.type"); 236 return name == null ? CompressionType.RECORD : 237 CompressionType.valueOf(name); 238 } 239 240 /** 241 * Set the default compression type for sequence files. 242 * @param job the configuration to modify 243 * @param val the new compression type (none, block, record) 244 */ 245 static public void setDefaultCompressionType(Configuration job, 246 CompressionType val) { 247 job.set("io.seqfile.compression.type", val.toString()); 248 } 249 250 /** 251 * Create a new Writer with the given options. 252 * @param conf the configuration to use 253 * @param opts the options to create the file with 254 * @return a new Writer 255 * @throws IOException 256 */ 257 public static Writer createWriter(Configuration conf, Writer.Option... opts 258 ) throws IOException { 259 Writer.CompressionOption compressionOption = 260 Options.getOption(Writer.CompressionOption.class, opts); 261 CompressionType kind; 262 if (compressionOption != null) { 263 kind = compressionOption.getValue(); 264 } else { 265 kind = getDefaultCompressionType(conf); 266 opts = Options.prependOptions(opts, Writer.compression(kind)); 267 } 268 switch (kind) { 269 default: 270 case NONE: 271 return new Writer(conf, opts); 272 case RECORD: 273 return new RecordCompressWriter(conf, opts); 274 case BLOCK: 275 return new BlockCompressWriter(conf, opts); 276 } 277 } 278 279 /** 280 * Construct the preferred type of SequenceFile Writer. 281 * @param fs The configured filesystem. 282 * @param conf The configuration. 283 * @param name The name of the file. 284 * @param keyClass The 'key' type. 285 * @param valClass The 'value' type. 286 * @return Returns the handle to the constructed SequenceFile Writer. 287 * @throws IOException 288 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 289 * instead. 290 */ 291 @Deprecated 292 public static Writer 293 createWriter(FileSystem fs, Configuration conf, Path name, 294 Class keyClass, Class valClass) throws IOException { 295 return createWriter(conf, Writer.filesystem(fs), 296 Writer.file(name), Writer.keyClass(keyClass), 297 Writer.valueClass(valClass)); 298 } 299 300 /** 301 * Construct the preferred type of SequenceFile Writer. 302 * @param fs The configured filesystem. 303 * @param conf The configuration. 304 * @param name The name of the file. 305 * @param keyClass The 'key' type. 306 * @param valClass The 'value' type. 307 * @param compressionType The compression type. 308 * @return Returns the handle to the constructed SequenceFile Writer. 309 * @throws IOException 310 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 311 * instead. 312 */ 313 @Deprecated 314 public static Writer 315 createWriter(FileSystem fs, Configuration conf, Path name, 316 Class keyClass, Class valClass, 317 CompressionType compressionType) throws IOException { 318 return createWriter(conf, Writer.filesystem(fs), 319 Writer.file(name), Writer.keyClass(keyClass), 320 Writer.valueClass(valClass), 321 Writer.compression(compressionType)); 322 } 323 324 /** 325 * Construct the preferred type of SequenceFile Writer. 326 * @param fs The configured filesystem. 327 * @param conf The configuration. 328 * @param name The name of the file. 329 * @param keyClass The 'key' type. 330 * @param valClass The 'value' type. 331 * @param compressionType The compression type. 332 * @param progress The Progressable object to track progress. 333 * @return Returns the handle to the constructed SequenceFile Writer. 334 * @throws IOException 335 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 336 * instead. 337 */ 338 @Deprecated 339 public static Writer 340 createWriter(FileSystem fs, Configuration conf, Path name, 341 Class keyClass, Class valClass, CompressionType compressionType, 342 Progressable progress) throws IOException { 343 return createWriter(conf, Writer.file(name), 344 Writer.filesystem(fs), 345 Writer.keyClass(keyClass), 346 Writer.valueClass(valClass), 347 Writer.compression(compressionType), 348 Writer.progressable(progress)); 349 } 350 351 /** 352 * Construct the preferred type of SequenceFile Writer. 353 * @param fs The configured filesystem. 354 * @param conf The configuration. 355 * @param name The name of the file. 356 * @param keyClass The 'key' type. 357 * @param valClass The 'value' type. 358 * @param compressionType The compression type. 359 * @param codec The compression codec. 360 * @return Returns the handle to the constructed SequenceFile Writer. 361 * @throws IOException 362 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 363 * instead. 364 */ 365 @Deprecated 366 public static Writer 367 createWriter(FileSystem fs, Configuration conf, Path name, 368 Class keyClass, Class valClass, CompressionType compressionType, 369 CompressionCodec codec) throws IOException { 370 return createWriter(conf, Writer.file(name), 371 Writer.filesystem(fs), 372 Writer.keyClass(keyClass), 373 Writer.valueClass(valClass), 374 Writer.compression(compressionType, codec)); 375 } 376 377 /** 378 * Construct the preferred type of SequenceFile Writer. 379 * @param fs The configured filesystem. 380 * @param conf The configuration. 381 * @param name The name of the file. 382 * @param keyClass The 'key' type. 383 * @param valClass The 'value' type. 384 * @param compressionType The compression type. 385 * @param codec The compression codec. 386 * @param progress The Progressable object to track progress. 387 * @param metadata The metadata of the file. 388 * @return Returns the handle to the constructed SequenceFile Writer. 389 * @throws IOException 390 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 391 * instead. 392 */ 393 @Deprecated 394 public static Writer 395 createWriter(FileSystem fs, Configuration conf, Path name, 396 Class keyClass, Class valClass, 397 CompressionType compressionType, CompressionCodec codec, 398 Progressable progress, Metadata metadata) throws IOException { 399 return createWriter(conf, Writer.file(name), 400 Writer.filesystem(fs), 401 Writer.keyClass(keyClass), 402 Writer.valueClass(valClass), 403 Writer.compression(compressionType, codec), 404 Writer.progressable(progress), 405 Writer.metadata(metadata)); 406 } 407 408 /** 409 * Construct the preferred type of SequenceFile Writer. 410 * @param fs The configured filesystem. 411 * @param conf The configuration. 412 * @param name The name of the file. 413 * @param keyClass The 'key' type. 414 * @param valClass The 'value' type. 415 * @param bufferSize buffer size for the underlaying outputstream. 416 * @param replication replication factor for the file. 417 * @param blockSize block size for the file. 418 * @param compressionType The compression type. 419 * @param codec The compression codec. 420 * @param progress The Progressable object to track progress. 421 * @param metadata The metadata of the file. 422 * @return Returns the handle to the constructed SequenceFile Writer. 423 * @throws IOException 424 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 425 * instead. 426 */ 427 @Deprecated 428 public static Writer 429 createWriter(FileSystem fs, Configuration conf, Path name, 430 Class keyClass, Class valClass, int bufferSize, 431 short replication, long blockSize, 432 CompressionType compressionType, CompressionCodec codec, 433 Progressable progress, Metadata metadata) throws IOException { 434 return createWriter(conf, Writer.file(name), 435 Writer.filesystem(fs), 436 Writer.keyClass(keyClass), 437 Writer.valueClass(valClass), 438 Writer.bufferSize(bufferSize), 439 Writer.replication(replication), 440 Writer.blockSize(blockSize), 441 Writer.compression(compressionType, codec), 442 Writer.progressable(progress), 443 Writer.metadata(metadata)); 444 } 445 446 /** 447 * Construct the preferred type of SequenceFile Writer. 448 * @param fs The configured filesystem. 449 * @param conf The configuration. 450 * @param name The name of the file. 451 * @param keyClass The 'key' type. 452 * @param valClass The 'value' type. 453 * @param bufferSize buffer size for the underlaying outputstream. 454 * @param replication replication factor for the file. 455 * @param blockSize block size for the file. 456 * @param createParent create parent directory if non-existent 457 * @param compressionType The compression type. 458 * @param codec The compression codec. 459 * @param metadata The metadata of the file. 460 * @return Returns the handle to the constructed SequenceFile Writer. 461 * @throws IOException 462 */ 463 @Deprecated 464 public static Writer 465 createWriter(FileSystem fs, Configuration conf, Path name, 466 Class keyClass, Class valClass, int bufferSize, 467 short replication, long blockSize, boolean createParent, 468 CompressionType compressionType, CompressionCodec codec, 469 Metadata metadata) throws IOException { 470 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 471 conf, name, keyClass, valClass, compressionType, codec, 472 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 473 CreateOpts.bufferSize(bufferSize), 474 createParent ? CreateOpts.createParent() 475 : CreateOpts.donotCreateParent(), 476 CreateOpts.repFac(replication), 477 CreateOpts.blockSize(blockSize) 478 ); 479 } 480 481 /** 482 * Construct the preferred type of SequenceFile Writer. 483 * @param fc The context for the specified file. 484 * @param conf The configuration. 485 * @param name The name of the file. 486 * @param keyClass The 'key' type. 487 * @param valClass The 'value' type. 488 * @param compressionType The compression type. 489 * @param codec The compression codec. 490 * @param metadata The metadata of the file. 491 * @param createFlag gives the semantics of create: overwrite, append etc. 492 * @param opts file creation options; see {@link CreateOpts}. 493 * @return Returns the handle to the constructed SequenceFile Writer. 494 * @throws IOException 495 */ 496 public static Writer 497 createWriter(FileContext fc, Configuration conf, Path name, 498 Class keyClass, Class valClass, 499 CompressionType compressionType, CompressionCodec codec, 500 Metadata metadata, 501 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 502 throws IOException { 503 return createWriter(conf, fc.create(name, createFlag, opts), 504 keyClass, valClass, compressionType, codec, metadata).ownStream(); 505 } 506 507 /** 508 * Construct the preferred type of SequenceFile Writer. 509 * @param fs The configured filesystem. 510 * @param conf The configuration. 511 * @param name The name of the file. 512 * @param keyClass The 'key' type. 513 * @param valClass The 'value' type. 514 * @param compressionType The compression type. 515 * @param codec The compression codec. 516 * @param progress The Progressable object to track progress. 517 * @return Returns the handle to the constructed SequenceFile Writer. 518 * @throws IOException 519 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 520 * instead. 521 */ 522 @Deprecated 523 public static Writer 524 createWriter(FileSystem fs, Configuration conf, Path name, 525 Class keyClass, Class valClass, 526 CompressionType compressionType, CompressionCodec codec, 527 Progressable progress) throws IOException { 528 return createWriter(conf, Writer.file(name), 529 Writer.filesystem(fs), 530 Writer.keyClass(keyClass), 531 Writer.valueClass(valClass), 532 Writer.compression(compressionType, codec), 533 Writer.progressable(progress)); 534 } 535 536 /** 537 * Construct the preferred type of 'raw' SequenceFile Writer. 538 * @param conf The configuration. 539 * @param out The stream on top which the writer is to be constructed. 540 * @param keyClass The 'key' type. 541 * @param valClass The 'value' type. 542 * @param compressionType The compression type. 543 * @param codec The compression codec. 544 * @param metadata The metadata of the file. 545 * @return Returns the handle to the constructed SequenceFile Writer. 546 * @throws IOException 547 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 548 * instead. 549 */ 550 @Deprecated 551 public static Writer 552 createWriter(Configuration conf, FSDataOutputStream out, 553 Class keyClass, Class valClass, 554 CompressionType compressionType, 555 CompressionCodec codec, Metadata metadata) throws IOException { 556 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 557 Writer.valueClass(valClass), 558 Writer.compression(compressionType, codec), 559 Writer.metadata(metadata)); 560 } 561 562 /** 563 * Construct the preferred type of 'raw' SequenceFile Writer. 564 * @param conf The configuration. 565 * @param out The stream on top which the writer is to be constructed. 566 * @param keyClass The 'key' type. 567 * @param valClass The 'value' type. 568 * @param compressionType The compression type. 569 * @param codec The compression codec. 570 * @return Returns the handle to the constructed SequenceFile Writer. 571 * @throws IOException 572 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 573 * instead. 574 */ 575 @Deprecated 576 public static Writer 577 createWriter(Configuration conf, FSDataOutputStream out, 578 Class keyClass, Class valClass, CompressionType compressionType, 579 CompressionCodec codec) throws IOException { 580 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 581 Writer.valueClass(valClass), 582 Writer.compression(compressionType, codec)); 583 } 584 585 586 /** The interface to 'raw' values of SequenceFiles. */ 587 public static interface ValueBytes { 588 589 /** Writes the uncompressed bytes to the outStream. 590 * @param outStream : Stream to write uncompressed bytes into. 591 * @throws IOException 592 */ 593 public void writeUncompressedBytes(DataOutputStream outStream) 594 throws IOException; 595 596 /** Write compressed bytes to outStream. 597 * Note: that it will NOT compress the bytes if they are not compressed. 598 * @param outStream : Stream to write compressed bytes into. 599 */ 600 public void writeCompressedBytes(DataOutputStream outStream) 601 throws IllegalArgumentException, IOException; 602 603 /** 604 * Size of stored data. 605 */ 606 public int getSize(); 607 } 608 609 private static class UncompressedBytes implements ValueBytes { 610 private int dataSize; 611 private byte[] data; 612 613 private UncompressedBytes() { 614 data = null; 615 dataSize = 0; 616 } 617 618 private void reset(DataInputStream in, int length) throws IOException { 619 if (data == null) { 620 data = new byte[length]; 621 } else if (length > data.length) { 622 data = new byte[Math.max(length, data.length * 2)]; 623 } 624 dataSize = -1; 625 in.readFully(data, 0, length); 626 dataSize = length; 627 } 628 629 @Override 630 public int getSize() { 631 return dataSize; 632 } 633 634 @Override 635 public void writeUncompressedBytes(DataOutputStream outStream) 636 throws IOException { 637 outStream.write(data, 0, dataSize); 638 } 639 640 @Override 641 public void writeCompressedBytes(DataOutputStream outStream) 642 throws IllegalArgumentException, IOException { 643 throw 644 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 645 } 646 647 } // UncompressedBytes 648 649 private static class CompressedBytes implements ValueBytes { 650 private int dataSize; 651 private byte[] data; 652 DataInputBuffer rawData = null; 653 CompressionCodec codec = null; 654 CompressionInputStream decompressedStream = null; 655 656 private CompressedBytes(CompressionCodec codec) { 657 data = null; 658 dataSize = 0; 659 this.codec = codec; 660 } 661 662 private void reset(DataInputStream in, int length) throws IOException { 663 if (data == null) { 664 data = new byte[length]; 665 } else if (length > data.length) { 666 data = new byte[Math.max(length, data.length * 2)]; 667 } 668 dataSize = -1; 669 in.readFully(data, 0, length); 670 dataSize = length; 671 } 672 673 @Override 674 public int getSize() { 675 return dataSize; 676 } 677 678 @Override 679 public void writeUncompressedBytes(DataOutputStream outStream) 680 throws IOException { 681 if (decompressedStream == null) { 682 rawData = new DataInputBuffer(); 683 decompressedStream = codec.createInputStream(rawData); 684 } else { 685 decompressedStream.resetState(); 686 } 687 rawData.reset(data, 0, dataSize); 688 689 byte[] buffer = new byte[8192]; 690 int bytesRead = 0; 691 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 692 outStream.write(buffer, 0, bytesRead); 693 } 694 } 695 696 @Override 697 public void writeCompressedBytes(DataOutputStream outStream) 698 throws IllegalArgumentException, IOException { 699 outStream.write(data, 0, dataSize); 700 } 701 702 } // CompressedBytes 703 704 /** 705 * The class encapsulating with the metadata of a file. 706 * The metadata of a file is a list of attribute name/value 707 * pairs of Text type. 708 * 709 */ 710 public static class Metadata implements Writable { 711 712 private TreeMap<Text, Text> theMetadata; 713 714 public Metadata() { 715 this(new TreeMap<Text, Text>()); 716 } 717 718 public Metadata(TreeMap<Text, Text> arg) { 719 if (arg == null) { 720 this.theMetadata = new TreeMap<Text, Text>(); 721 } else { 722 this.theMetadata = arg; 723 } 724 } 725 726 public Text get(Text name) { 727 return this.theMetadata.get(name); 728 } 729 730 public void set(Text name, Text value) { 731 this.theMetadata.put(name, value); 732 } 733 734 public TreeMap<Text, Text> getMetadata() { 735 return new TreeMap<Text, Text>(this.theMetadata); 736 } 737 738 @Override 739 public void write(DataOutput out) throws IOException { 740 out.writeInt(this.theMetadata.size()); 741 Iterator<Map.Entry<Text, Text>> iter = 742 this.theMetadata.entrySet().iterator(); 743 while (iter.hasNext()) { 744 Map.Entry<Text, Text> en = iter.next(); 745 en.getKey().write(out); 746 en.getValue().write(out); 747 } 748 } 749 750 @Override 751 public void readFields(DataInput in) throws IOException { 752 int sz = in.readInt(); 753 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 754 this.theMetadata = new TreeMap<Text, Text>(); 755 for (int i = 0; i < sz; i++) { 756 Text key = new Text(); 757 Text val = new Text(); 758 key.readFields(in); 759 val.readFields(in); 760 this.theMetadata.put(key, val); 761 } 762 } 763 764 @Override 765 public boolean equals(Object other) { 766 if (other == null) { 767 return false; 768 } 769 if (other.getClass() != this.getClass()) { 770 return false; 771 } else { 772 return equals((Metadata)other); 773 } 774 } 775 776 public boolean equals(Metadata other) { 777 if (other == null) return false; 778 if (this.theMetadata.size() != other.theMetadata.size()) { 779 return false; 780 } 781 Iterator<Map.Entry<Text, Text>> iter1 = 782 this.theMetadata.entrySet().iterator(); 783 Iterator<Map.Entry<Text, Text>> iter2 = 784 other.theMetadata.entrySet().iterator(); 785 while (iter1.hasNext() && iter2.hasNext()) { 786 Map.Entry<Text, Text> en1 = iter1.next(); 787 Map.Entry<Text, Text> en2 = iter2.next(); 788 if (!en1.getKey().equals(en2.getKey())) { 789 return false; 790 } 791 if (!en1.getValue().equals(en2.getValue())) { 792 return false; 793 } 794 } 795 if (iter1.hasNext() || iter2.hasNext()) { 796 return false; 797 } 798 return true; 799 } 800 801 @Override 802 public int hashCode() { 803 assert false : "hashCode not designed"; 804 return 42; // any arbitrary constant will do 805 } 806 807 @Override 808 public String toString() { 809 StringBuilder sb = new StringBuilder(); 810 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 811 Iterator<Map.Entry<Text, Text>> iter = 812 this.theMetadata.entrySet().iterator(); 813 while (iter.hasNext()) { 814 Map.Entry<Text, Text> en = iter.next(); 815 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 816 sb.append("\n"); 817 } 818 return sb.toString(); 819 } 820 } 821 822 /** Write key/value pairs to a sequence-format file. */ 823 public static class Writer implements java.io.Closeable, Syncable { 824 private Configuration conf; 825 FSDataOutputStream out; 826 boolean ownOutputStream = true; 827 DataOutputBuffer buffer = new DataOutputBuffer(); 828 829 Class keyClass; 830 Class valClass; 831 832 private final CompressionType compress; 833 CompressionCodec codec = null; 834 CompressionOutputStream deflateFilter = null; 835 DataOutputStream deflateOut = null; 836 Metadata metadata = null; 837 Compressor compressor = null; 838 839 protected Serializer keySerializer; 840 protected Serializer uncompressedValSerializer; 841 protected Serializer compressedValSerializer; 842 843 // Insert a globally unique 16-byte value every few entries, so that one 844 // can seek into the middle of a file and then synchronize with record 845 // starts and ends by scanning for this value. 846 long lastSyncPos; // position of last sync 847 byte[] sync; // 16 random bytes 848 { 849 try { 850 MessageDigest digester = MessageDigest.getInstance("MD5"); 851 long time = Time.now(); 852 digester.update((new UID()+"@"+time).getBytes()); 853 sync = digester.digest(); 854 } catch (Exception e) { 855 throw new RuntimeException(e); 856 } 857 } 858 859 public static interface Option {} 860 861 static class FileOption extends Options.PathOption 862 implements Option { 863 FileOption(Path path) { 864 super(path); 865 } 866 } 867 868 /** 869 * @deprecated only used for backwards-compatibility in the createWriter methods 870 * that take FileSystem. 871 */ 872 @Deprecated 873 private static class FileSystemOption implements Option { 874 private final FileSystem value; 875 protected FileSystemOption(FileSystem value) { 876 this.value = value; 877 } 878 public FileSystem getValue() { 879 return value; 880 } 881 } 882 883 static class StreamOption extends Options.FSDataOutputStreamOption 884 implements Option { 885 StreamOption(FSDataOutputStream stream) { 886 super(stream); 887 } 888 } 889 890 static class BufferSizeOption extends Options.IntegerOption 891 implements Option { 892 BufferSizeOption(int value) { 893 super(value); 894 } 895 } 896 897 static class BlockSizeOption extends Options.LongOption implements Option { 898 BlockSizeOption(long value) { 899 super(value); 900 } 901 } 902 903 static class ReplicationOption extends Options.IntegerOption 904 implements Option { 905 ReplicationOption(int value) { 906 super(value); 907 } 908 } 909 910 static class KeyClassOption extends Options.ClassOption implements Option { 911 KeyClassOption(Class<?> value) { 912 super(value); 913 } 914 } 915 916 static class ValueClassOption extends Options.ClassOption 917 implements Option { 918 ValueClassOption(Class<?> value) { 919 super(value); 920 } 921 } 922 923 static class MetadataOption implements Option { 924 private final Metadata value; 925 MetadataOption(Metadata value) { 926 this.value = value; 927 } 928 Metadata getValue() { 929 return value; 930 } 931 } 932 933 static class ProgressableOption extends Options.ProgressableOption 934 implements Option { 935 ProgressableOption(Progressable value) { 936 super(value); 937 } 938 } 939 940 private static class CompressionOption implements Option { 941 private final CompressionType value; 942 private final CompressionCodec codec; 943 CompressionOption(CompressionType value) { 944 this(value, null); 945 } 946 CompressionOption(CompressionType value, CompressionCodec codec) { 947 this.value = value; 948 this.codec = (CompressionType.NONE != value && null == codec) 949 ? new DefaultCodec() 950 : codec; 951 } 952 CompressionType getValue() { 953 return value; 954 } 955 CompressionCodec getCodec() { 956 return codec; 957 } 958 } 959 960 public static Option file(Path value) { 961 return new FileOption(value); 962 } 963 964 /** 965 * @deprecated only used for backwards-compatibility in the createWriter methods 966 * that take FileSystem. 967 */ 968 @Deprecated 969 private static Option filesystem(FileSystem fs) { 970 return new SequenceFile.Writer.FileSystemOption(fs); 971 } 972 973 public static Option bufferSize(int value) { 974 return new BufferSizeOption(value); 975 } 976 977 public static Option stream(FSDataOutputStream value) { 978 return new StreamOption(value); 979 } 980 981 public static Option replication(short value) { 982 return new ReplicationOption(value); 983 } 984 985 public static Option blockSize(long value) { 986 return new BlockSizeOption(value); 987 } 988 989 public static Option progressable(Progressable value) { 990 return new ProgressableOption(value); 991 } 992 993 public static Option keyClass(Class<?> value) { 994 return new KeyClassOption(value); 995 } 996 997 public static Option valueClass(Class<?> value) { 998 return new ValueClassOption(value); 999 } 1000 1001 public static Option metadata(Metadata value) { 1002 return new MetadataOption(value); 1003 } 1004 1005 public static Option compression(CompressionType value) { 1006 return new CompressionOption(value); 1007 } 1008 1009 public static Option compression(CompressionType value, 1010 CompressionCodec codec) { 1011 return new CompressionOption(value, codec); 1012 } 1013 1014 /** 1015 * Construct a uncompressed writer from a set of options. 1016 * @param conf the configuration to use 1017 * @param options the options used when creating the writer 1018 * @throws IOException if it fails 1019 */ 1020 Writer(Configuration conf, 1021 Option... opts) throws IOException { 1022 BlockSizeOption blockSizeOption = 1023 Options.getOption(BlockSizeOption.class, opts); 1024 BufferSizeOption bufferSizeOption = 1025 Options.getOption(BufferSizeOption.class, opts); 1026 ReplicationOption replicationOption = 1027 Options.getOption(ReplicationOption.class, opts); 1028 ProgressableOption progressOption = 1029 Options.getOption(ProgressableOption.class, opts); 1030 FileOption fileOption = Options.getOption(FileOption.class, opts); 1031 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1032 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1033 KeyClassOption keyClassOption = 1034 Options.getOption(KeyClassOption.class, opts); 1035 ValueClassOption valueClassOption = 1036 Options.getOption(ValueClassOption.class, opts); 1037 MetadataOption metadataOption = 1038 Options.getOption(MetadataOption.class, opts); 1039 CompressionOption compressionTypeOption = 1040 Options.getOption(CompressionOption.class, opts); 1041 // check consistency of options 1042 if ((fileOption == null) == (streamOption == null)) { 1043 throw new IllegalArgumentException("file or stream must be specified"); 1044 } 1045 if (fileOption == null && (blockSizeOption != null || 1046 bufferSizeOption != null || 1047 replicationOption != null || 1048 progressOption != null)) { 1049 throw new IllegalArgumentException("file modifier options not " + 1050 "compatible with stream"); 1051 } 1052 1053 FSDataOutputStream out; 1054 boolean ownStream = fileOption != null; 1055 if (ownStream) { 1056 Path p = fileOption.getValue(); 1057 FileSystem fs; 1058 if (fsOption != null) { 1059 fs = fsOption.getValue(); 1060 } else { 1061 fs = p.getFileSystem(conf); 1062 } 1063 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1064 bufferSizeOption.getValue(); 1065 short replication = replicationOption == null ? 1066 fs.getDefaultReplication(p) : 1067 (short) replicationOption.getValue(); 1068 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1069 blockSizeOption.getValue(); 1070 Progressable progress = progressOption == null ? null : 1071 progressOption.getValue(); 1072 out = fs.create(p, true, bufferSize, replication, blockSize, progress); 1073 } else { 1074 out = streamOption.getValue(); 1075 } 1076 Class<?> keyClass = keyClassOption == null ? 1077 Object.class : keyClassOption.getValue(); 1078 Class<?> valueClass = valueClassOption == null ? 1079 Object.class : valueClassOption.getValue(); 1080 Metadata metadata = metadataOption == null ? 1081 new Metadata() : metadataOption.getValue(); 1082 this.compress = compressionTypeOption.getValue(); 1083 final CompressionCodec codec = compressionTypeOption.getCodec(); 1084 if (codec != null && 1085 (codec instanceof GzipCodec) && 1086 !NativeCodeLoader.isNativeCodeLoaded() && 1087 !ZlibFactory.isNativeZlibLoaded(conf)) { 1088 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1089 "GzipCodec without native-hadoop " + 1090 "code!"); 1091 } 1092 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1093 } 1094 1095 /** Create the named file. 1096 * @deprecated Use 1097 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1098 * instead. 1099 */ 1100 @Deprecated 1101 public Writer(FileSystem fs, Configuration conf, Path name, 1102 Class keyClass, Class valClass) throws IOException { 1103 this.compress = CompressionType.NONE; 1104 init(conf, fs.create(name), true, keyClass, valClass, null, 1105 new Metadata()); 1106 } 1107 1108 /** Create the named file with write-progress reporter. 1109 * @deprecated Use 1110 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1111 * instead. 1112 */ 1113 @Deprecated 1114 public Writer(FileSystem fs, Configuration conf, Path name, 1115 Class keyClass, Class valClass, 1116 Progressable progress, Metadata metadata) throws IOException { 1117 this.compress = CompressionType.NONE; 1118 init(conf, fs.create(name, progress), true, keyClass, valClass, 1119 null, metadata); 1120 } 1121 1122 /** Create the named file with write-progress reporter. 1123 * @deprecated Use 1124 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1125 * instead. 1126 */ 1127 @Deprecated 1128 public Writer(FileSystem fs, Configuration conf, Path name, 1129 Class keyClass, Class valClass, 1130 int bufferSize, short replication, long blockSize, 1131 Progressable progress, Metadata metadata) throws IOException { 1132 this.compress = CompressionType.NONE; 1133 init(conf, 1134 fs.create(name, true, bufferSize, replication, blockSize, progress), 1135 true, keyClass, valClass, null, metadata); 1136 } 1137 1138 boolean isCompressed() { return compress != CompressionType.NONE; } 1139 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1140 1141 Writer ownStream() { this.ownOutputStream = true; return this; } 1142 1143 /** Write and flush the file header. */ 1144 private void writeFileHeader() 1145 throws IOException { 1146 out.write(VERSION); 1147 Text.writeString(out, keyClass.getName()); 1148 Text.writeString(out, valClass.getName()); 1149 1150 out.writeBoolean(this.isCompressed()); 1151 out.writeBoolean(this.isBlockCompressed()); 1152 1153 if (this.isCompressed()) { 1154 Text.writeString(out, (codec.getClass()).getName()); 1155 } 1156 this.metadata.write(out); 1157 out.write(sync); // write the sync bytes 1158 out.flush(); // flush header 1159 } 1160 1161 /** Initialize. */ 1162 @SuppressWarnings("unchecked") 1163 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1164 Class keyClass, Class valClass, 1165 CompressionCodec codec, Metadata metadata) 1166 throws IOException { 1167 this.conf = conf; 1168 this.out = out; 1169 this.ownOutputStream = ownStream; 1170 this.keyClass = keyClass; 1171 this.valClass = valClass; 1172 this.codec = codec; 1173 this.metadata = metadata; 1174 SerializationFactory serializationFactory = new SerializationFactory(conf); 1175 this.keySerializer = serializationFactory.getSerializer(keyClass); 1176 if (this.keySerializer == null) { 1177 throw new IOException( 1178 "Could not find a serializer for the Key class: '" 1179 + keyClass.getCanonicalName() + "'. " 1180 + "Please ensure that the configuration '" + 1181 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1182 + "properly configured, if you're using" 1183 + "custom serialization."); 1184 } 1185 this.keySerializer.open(buffer); 1186 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1187 if (this.uncompressedValSerializer == null) { 1188 throw new IOException( 1189 "Could not find a serializer for the Value class: '" 1190 + valClass.getCanonicalName() + "'. " 1191 + "Please ensure that the configuration '" + 1192 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1193 + "properly configured, if you're using" 1194 + "custom serialization."); 1195 } 1196 this.uncompressedValSerializer.open(buffer); 1197 if (this.codec != null) { 1198 ReflectionUtils.setConf(this.codec, this.conf); 1199 this.compressor = CodecPool.getCompressor(this.codec); 1200 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1201 this.deflateOut = 1202 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1203 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1204 if (this.compressedValSerializer == null) { 1205 throw new IOException( 1206 "Could not find a serializer for the Value class: '" 1207 + valClass.getCanonicalName() + "'. " 1208 + "Please ensure that the configuration '" + 1209 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1210 + "properly configured, if you're using" 1211 + "custom serialization."); 1212 } 1213 this.compressedValSerializer.open(deflateOut); 1214 } 1215 writeFileHeader(); 1216 } 1217 1218 /** Returns the class of keys in this file. */ 1219 public Class getKeyClass() { return keyClass; } 1220 1221 /** Returns the class of values in this file. */ 1222 public Class getValueClass() { return valClass; } 1223 1224 /** Returns the compression codec of data in this file. */ 1225 public CompressionCodec getCompressionCodec() { return codec; } 1226 1227 /** create a sync point */ 1228 public void sync() throws IOException { 1229 if (sync != null && lastSyncPos != out.getPos()) { 1230 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1231 out.write(sync); // write sync 1232 lastSyncPos = out.getPos(); // update lastSyncPos 1233 } 1234 } 1235 1236 /** 1237 * flush all currently written data to the file system 1238 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1239 */ 1240 @Deprecated 1241 public void syncFs() throws IOException { 1242 if (out != null) { 1243 out.sync(); // flush contents to file system 1244 } 1245 } 1246 1247 @Override 1248 public void hsync() throws IOException { 1249 if (out != null) { 1250 out.hsync(); 1251 } 1252 } 1253 1254 @Override 1255 public void hflush() throws IOException { 1256 if (out != null) { 1257 out.hflush(); 1258 } 1259 } 1260 1261 /** Returns the configuration of this file. */ 1262 Configuration getConf() { return conf; } 1263 1264 /** Close the file. */ 1265 @Override 1266 public synchronized void close() throws IOException { 1267 keySerializer.close(); 1268 uncompressedValSerializer.close(); 1269 if (compressedValSerializer != null) { 1270 compressedValSerializer.close(); 1271 } 1272 1273 CodecPool.returnCompressor(compressor); 1274 compressor = null; 1275 1276 if (out != null) { 1277 1278 // Close the underlying stream iff we own it... 1279 if (ownOutputStream) { 1280 out.close(); 1281 } else { 1282 out.flush(); 1283 } 1284 out = null; 1285 } 1286 } 1287 1288 synchronized void checkAndWriteSync() throws IOException { 1289 if (sync != null && 1290 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1291 sync(); 1292 } 1293 } 1294 1295 /** Append a key/value pair. */ 1296 public void append(Writable key, Writable val) 1297 throws IOException { 1298 append((Object) key, (Object) val); 1299 } 1300 1301 /** Append a key/value pair. */ 1302 @SuppressWarnings("unchecked") 1303 public synchronized void append(Object key, Object val) 1304 throws IOException { 1305 if (key.getClass() != keyClass) 1306 throw new IOException("wrong key class: "+key.getClass().getName() 1307 +" is not "+keyClass); 1308 if (val.getClass() != valClass) 1309 throw new IOException("wrong value class: "+val.getClass().getName() 1310 +" is not "+valClass); 1311 1312 buffer.reset(); 1313 1314 // Append the 'key' 1315 keySerializer.serialize(key); 1316 int keyLength = buffer.getLength(); 1317 if (keyLength < 0) 1318 throw new IOException("negative length keys not allowed: " + key); 1319 1320 // Append the 'value' 1321 if (compress == CompressionType.RECORD) { 1322 deflateFilter.resetState(); 1323 compressedValSerializer.serialize(val); 1324 deflateOut.flush(); 1325 deflateFilter.finish(); 1326 } else { 1327 uncompressedValSerializer.serialize(val); 1328 } 1329 1330 // Write the record out 1331 checkAndWriteSync(); // sync 1332 out.writeInt(buffer.getLength()); // total record length 1333 out.writeInt(keyLength); // key portion length 1334 out.write(buffer.getData(), 0, buffer.getLength()); // data 1335 } 1336 1337 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1338 int keyLength, ValueBytes val) throws IOException { 1339 if (keyLength < 0) 1340 throw new IOException("negative length keys not allowed: " + keyLength); 1341 1342 int valLength = val.getSize(); 1343 1344 checkAndWriteSync(); 1345 1346 out.writeInt(keyLength+valLength); // total record length 1347 out.writeInt(keyLength); // key portion length 1348 out.write(keyData, keyOffset, keyLength); // key 1349 val.writeUncompressedBytes(out); // value 1350 } 1351 1352 /** Returns the current length of the output file. 1353 * 1354 * <p>This always returns a synchronized position. In other words, 1355 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1356 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1357 * the key may be earlier in the file than key last written when this 1358 * method was called (e.g., with block-compression, it may be the first key 1359 * in the block that was being written when this method was called). 1360 */ 1361 public synchronized long getLength() throws IOException { 1362 return out.getPos(); 1363 } 1364 1365 } // class Writer 1366 1367 /** Write key/compressed-value pairs to a sequence-format file. */ 1368 static class RecordCompressWriter extends Writer { 1369 1370 RecordCompressWriter(Configuration conf, 1371 Option... options) throws IOException { 1372 super(conf, options); 1373 } 1374 1375 /** Append a key/value pair. */ 1376 @Override 1377 @SuppressWarnings("unchecked") 1378 public synchronized void append(Object key, Object val) 1379 throws IOException { 1380 if (key.getClass() != keyClass) 1381 throw new IOException("wrong key class: "+key.getClass().getName() 1382 +" is not "+keyClass); 1383 if (val.getClass() != valClass) 1384 throw new IOException("wrong value class: "+val.getClass().getName() 1385 +" is not "+valClass); 1386 1387 buffer.reset(); 1388 1389 // Append the 'key' 1390 keySerializer.serialize(key); 1391 int keyLength = buffer.getLength(); 1392 if (keyLength < 0) 1393 throw new IOException("negative length keys not allowed: " + key); 1394 1395 // Compress 'value' and append it 1396 deflateFilter.resetState(); 1397 compressedValSerializer.serialize(val); 1398 deflateOut.flush(); 1399 deflateFilter.finish(); 1400 1401 // Write the record out 1402 checkAndWriteSync(); // sync 1403 out.writeInt(buffer.getLength()); // total record length 1404 out.writeInt(keyLength); // key portion length 1405 out.write(buffer.getData(), 0, buffer.getLength()); // data 1406 } 1407 1408 /** Append a key/value pair. */ 1409 @Override 1410 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1411 int keyLength, ValueBytes val) throws IOException { 1412 1413 if (keyLength < 0) 1414 throw new IOException("negative length keys not allowed: " + keyLength); 1415 1416 int valLength = val.getSize(); 1417 1418 checkAndWriteSync(); // sync 1419 out.writeInt(keyLength+valLength); // total record length 1420 out.writeInt(keyLength); // key portion length 1421 out.write(keyData, keyOffset, keyLength); // 'key' data 1422 val.writeCompressedBytes(out); // 'value' data 1423 } 1424 1425 } // RecordCompressionWriter 1426 1427 /** Write compressed key/value blocks to a sequence-format file. */ 1428 static class BlockCompressWriter extends Writer { 1429 1430 private int noBufferedRecords = 0; 1431 1432 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1433 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1434 1435 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1436 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1437 1438 private final int compressionBlockSize; 1439 1440 BlockCompressWriter(Configuration conf, 1441 Option... options) throws IOException { 1442 super(conf, options); 1443 compressionBlockSize = 1444 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1445 keySerializer.close(); 1446 keySerializer.open(keyBuffer); 1447 uncompressedValSerializer.close(); 1448 uncompressedValSerializer.open(valBuffer); 1449 } 1450 1451 /** Workhorse to check and write out compressed data/lengths */ 1452 private synchronized 1453 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1454 throws IOException { 1455 deflateFilter.resetState(); 1456 buffer.reset(); 1457 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1458 uncompressedDataBuffer.getLength()); 1459 deflateOut.flush(); 1460 deflateFilter.finish(); 1461 1462 WritableUtils.writeVInt(out, buffer.getLength()); 1463 out.write(buffer.getData(), 0, buffer.getLength()); 1464 } 1465 1466 /** Compress and flush contents to dfs */ 1467 @Override 1468 public synchronized void sync() throws IOException { 1469 if (noBufferedRecords > 0) { 1470 super.sync(); 1471 1472 // No. of records 1473 WritableUtils.writeVInt(out, noBufferedRecords); 1474 1475 // Write 'keys' and lengths 1476 writeBuffer(keyLenBuffer); 1477 writeBuffer(keyBuffer); 1478 1479 // Write 'values' and lengths 1480 writeBuffer(valLenBuffer); 1481 writeBuffer(valBuffer); 1482 1483 // Flush the file-stream 1484 out.flush(); 1485 1486 // Reset internal states 1487 keyLenBuffer.reset(); 1488 keyBuffer.reset(); 1489 valLenBuffer.reset(); 1490 valBuffer.reset(); 1491 noBufferedRecords = 0; 1492 } 1493 1494 } 1495 1496 /** Close the file. */ 1497 @Override 1498 public synchronized void close() throws IOException { 1499 if (out != null) { 1500 sync(); 1501 } 1502 super.close(); 1503 } 1504 1505 /** Append a key/value pair. */ 1506 @Override 1507 @SuppressWarnings("unchecked") 1508 public synchronized void append(Object key, Object val) 1509 throws IOException { 1510 if (key.getClass() != keyClass) 1511 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1512 if (val.getClass() != valClass) 1513 throw new IOException("wrong value class: "+val+" is not "+valClass); 1514 1515 // Save key/value into respective buffers 1516 int oldKeyLength = keyBuffer.getLength(); 1517 keySerializer.serialize(key); 1518 int keyLength = keyBuffer.getLength() - oldKeyLength; 1519 if (keyLength < 0) 1520 throw new IOException("negative length keys not allowed: " + key); 1521 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1522 1523 int oldValLength = valBuffer.getLength(); 1524 uncompressedValSerializer.serialize(val); 1525 int valLength = valBuffer.getLength() - oldValLength; 1526 WritableUtils.writeVInt(valLenBuffer, valLength); 1527 1528 // Added another key/value pair 1529 ++noBufferedRecords; 1530 1531 // Compress and flush? 1532 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1533 if (currentBlockSize >= compressionBlockSize) { 1534 sync(); 1535 } 1536 } 1537 1538 /** Append a key/value pair. */ 1539 @Override 1540 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1541 int keyLength, ValueBytes val) throws IOException { 1542 1543 if (keyLength < 0) 1544 throw new IOException("negative length keys not allowed"); 1545 1546 int valLength = val.getSize(); 1547 1548 // Save key/value data in relevant buffers 1549 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1550 keyBuffer.write(keyData, keyOffset, keyLength); 1551 WritableUtils.writeVInt(valLenBuffer, valLength); 1552 val.writeUncompressedBytes(valBuffer); 1553 1554 // Added another key/value pair 1555 ++noBufferedRecords; 1556 1557 // Compress and flush? 1558 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1559 if (currentBlockSize >= compressionBlockSize) { 1560 sync(); 1561 } 1562 } 1563 1564 } // BlockCompressionWriter 1565 1566 /** Get the configured buffer size */ 1567 private static int getBufferSize(Configuration conf) { 1568 return conf.getInt("io.file.buffer.size", 4096); 1569 } 1570 1571 /** Reads key/value pairs from a sequence-format file. */ 1572 public static class Reader implements java.io.Closeable { 1573 private String filename; 1574 private FSDataInputStream in; 1575 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1576 1577 private byte version; 1578 1579 private String keyClassName; 1580 private String valClassName; 1581 private Class keyClass; 1582 private Class valClass; 1583 1584 private CompressionCodec codec = null; 1585 private Metadata metadata = null; 1586 1587 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1588 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1589 private boolean syncSeen; 1590 1591 private long headerEnd; 1592 private long end; 1593 private int keyLength; 1594 private int recordLength; 1595 1596 private boolean decompress; 1597 private boolean blockCompressed; 1598 1599 private Configuration conf; 1600 1601 private int noBufferedRecords = 0; 1602 private boolean lazyDecompress = true; 1603 private boolean valuesDecompressed = true; 1604 1605 private int noBufferedKeys = 0; 1606 private int noBufferedValues = 0; 1607 1608 private DataInputBuffer keyLenBuffer = null; 1609 private CompressionInputStream keyLenInFilter = null; 1610 private DataInputStream keyLenIn = null; 1611 private Decompressor keyLenDecompressor = null; 1612 private DataInputBuffer keyBuffer = null; 1613 private CompressionInputStream keyInFilter = null; 1614 private DataInputStream keyIn = null; 1615 private Decompressor keyDecompressor = null; 1616 1617 private DataInputBuffer valLenBuffer = null; 1618 private CompressionInputStream valLenInFilter = null; 1619 private DataInputStream valLenIn = null; 1620 private Decompressor valLenDecompressor = null; 1621 private DataInputBuffer valBuffer = null; 1622 private CompressionInputStream valInFilter = null; 1623 private DataInputStream valIn = null; 1624 private Decompressor valDecompressor = null; 1625 1626 private Deserializer keyDeserializer; 1627 private Deserializer valDeserializer; 1628 1629 /** 1630 * A tag interface for all of the Reader options 1631 */ 1632 public static interface Option {} 1633 1634 /** 1635 * Create an option to specify the path name of the sequence file. 1636 * @param value the path to read 1637 * @return a new option 1638 */ 1639 public static Option file(Path value) { 1640 return new FileOption(value); 1641 } 1642 1643 /** 1644 * Create an option to specify the stream with the sequence file. 1645 * @param value the stream to read. 1646 * @return a new option 1647 */ 1648 public static Option stream(FSDataInputStream value) { 1649 return new InputStreamOption(value); 1650 } 1651 1652 /** 1653 * Create an option to specify the starting byte to read. 1654 * @param value the number of bytes to skip over 1655 * @return a new option 1656 */ 1657 public static Option start(long value) { 1658 return new StartOption(value); 1659 } 1660 1661 /** 1662 * Create an option to specify the number of bytes to read. 1663 * @param value the number of bytes to read 1664 * @return a new option 1665 */ 1666 public static Option length(long value) { 1667 return new LengthOption(value); 1668 } 1669 1670 /** 1671 * Create an option with the buffer size for reading the given pathname. 1672 * @param value the number of bytes to buffer 1673 * @return a new option 1674 */ 1675 public static Option bufferSize(int value) { 1676 return new BufferSizeOption(value); 1677 } 1678 1679 private static class FileOption extends Options.PathOption 1680 implements Option { 1681 private FileOption(Path value) { 1682 super(value); 1683 } 1684 } 1685 1686 private static class InputStreamOption 1687 extends Options.FSDataInputStreamOption 1688 implements Option { 1689 private InputStreamOption(FSDataInputStream value) { 1690 super(value); 1691 } 1692 } 1693 1694 private static class StartOption extends Options.LongOption 1695 implements Option { 1696 private StartOption(long value) { 1697 super(value); 1698 } 1699 } 1700 1701 private static class LengthOption extends Options.LongOption 1702 implements Option { 1703 private LengthOption(long value) { 1704 super(value); 1705 } 1706 } 1707 1708 private static class BufferSizeOption extends Options.IntegerOption 1709 implements Option { 1710 private BufferSizeOption(int value) { 1711 super(value); 1712 } 1713 } 1714 1715 // only used directly 1716 private static class OnlyHeaderOption extends Options.BooleanOption 1717 implements Option { 1718 private OnlyHeaderOption() { 1719 super(true); 1720 } 1721 } 1722 1723 public Reader(Configuration conf, Option... opts) throws IOException { 1724 // Look up the options, these are null if not set 1725 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1726 InputStreamOption streamOpt = 1727 Options.getOption(InputStreamOption.class, opts); 1728 StartOption startOpt = Options.getOption(StartOption.class, opts); 1729 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1730 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1731 OnlyHeaderOption headerOnly = 1732 Options.getOption(OnlyHeaderOption.class, opts); 1733 // check for consistency 1734 if ((fileOpt == null) == (streamOpt == null)) { 1735 throw new 1736 IllegalArgumentException("File or stream option must be specified"); 1737 } 1738 if (fileOpt == null && bufOpt != null) { 1739 throw new IllegalArgumentException("buffer size can only be set when" + 1740 " a file is specified."); 1741 } 1742 // figure out the real values 1743 Path filename = null; 1744 FSDataInputStream file; 1745 final long len; 1746 if (fileOpt != null) { 1747 filename = fileOpt.getValue(); 1748 FileSystem fs = filename.getFileSystem(conf); 1749 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1750 len = null == lenOpt 1751 ? fs.getFileStatus(filename).getLen() 1752 : lenOpt.getValue(); 1753 file = openFile(fs, filename, bufSize, len); 1754 } else { 1755 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1756 file = streamOpt.getValue(); 1757 } 1758 long start = startOpt == null ? 0 : startOpt.getValue(); 1759 // really set up 1760 initialize(filename, file, start, len, conf, headerOnly != null); 1761 } 1762 1763 /** 1764 * Construct a reader by opening a file from the given file system. 1765 * @param fs The file system used to open the file. 1766 * @param file The file being read. 1767 * @param conf Configuration 1768 * @throws IOException 1769 * @deprecated Use Reader(Configuration, Option...) instead. 1770 */ 1771 @Deprecated 1772 public Reader(FileSystem fs, Path file, 1773 Configuration conf) throws IOException { 1774 this(conf, file(file.makeQualified(fs))); 1775 } 1776 1777 /** 1778 * Construct a reader by the given input stream. 1779 * @param in An input stream. 1780 * @param buffersize unused 1781 * @param start The starting position. 1782 * @param length The length being read. 1783 * @param conf Configuration 1784 * @throws IOException 1785 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1786 */ 1787 @Deprecated 1788 public Reader(FSDataInputStream in, int buffersize, 1789 long start, long length, Configuration conf) throws IOException { 1790 this(conf, stream(in), start(start), length(length)); 1791 } 1792 1793 /** Common work of the constructors. */ 1794 private void initialize(Path filename, FSDataInputStream in, 1795 long start, long length, Configuration conf, 1796 boolean tempReader) throws IOException { 1797 if (in == null) { 1798 throw new IllegalArgumentException("in == null"); 1799 } 1800 this.filename = filename == null ? "<unknown>" : filename.toString(); 1801 this.in = in; 1802 this.conf = conf; 1803 boolean succeeded = false; 1804 try { 1805 seek(start); 1806 this.end = this.in.getPos() + length; 1807 // if it wrapped around, use the max 1808 if (end < length) { 1809 end = Long.MAX_VALUE; 1810 } 1811 init(tempReader); 1812 succeeded = true; 1813 } finally { 1814 if (!succeeded) { 1815 IOUtils.cleanup(LOG, this.in); 1816 } 1817 } 1818 } 1819 1820 /** 1821 * Override this method to specialize the type of 1822 * {@link FSDataInputStream} returned. 1823 * @param fs The file system used to open the file. 1824 * @param file The file being read. 1825 * @param bufferSize The buffer size used to read the file. 1826 * @param length The length being read if it is >= 0. Otherwise, 1827 * the length is not available. 1828 * @return The opened stream. 1829 * @throws IOException 1830 */ 1831 protected FSDataInputStream openFile(FileSystem fs, Path file, 1832 int bufferSize, long length) throws IOException { 1833 return fs.open(file, bufferSize); 1834 } 1835 1836 /** 1837 * Initialize the {@link Reader} 1838 * @param tmpReader <code>true</code> if we are constructing a temporary 1839 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1840 * and hence do not initialize every component; 1841 * <code>false</code> otherwise. 1842 * @throws IOException 1843 */ 1844 private void init(boolean tempReader) throws IOException { 1845 byte[] versionBlock = new byte[VERSION.length]; 1846 in.readFully(versionBlock); 1847 1848 if ((versionBlock[0] != VERSION[0]) || 1849 (versionBlock[1] != VERSION[1]) || 1850 (versionBlock[2] != VERSION[2])) 1851 throw new IOException(this + " not a SequenceFile"); 1852 1853 // Set 'version' 1854 version = versionBlock[3]; 1855 if (version > VERSION[3]) 1856 throw new VersionMismatchException(VERSION[3], version); 1857 1858 if (version < BLOCK_COMPRESS_VERSION) { 1859 UTF8 className = new UTF8(); 1860 1861 className.readFields(in); 1862 keyClassName = className.toStringChecked(); // key class name 1863 1864 className.readFields(in); 1865 valClassName = className.toStringChecked(); // val class name 1866 } else { 1867 keyClassName = Text.readString(in); 1868 valClassName = Text.readString(in); 1869 } 1870 1871 if (version > 2) { // if version > 2 1872 this.decompress = in.readBoolean(); // is compressed? 1873 } else { 1874 decompress = false; 1875 } 1876 1877 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1878 this.blockCompressed = in.readBoolean(); // is block-compressed? 1879 } else { 1880 blockCompressed = false; 1881 } 1882 1883 // if version >= 5 1884 // setup the compression codec 1885 if (decompress) { 1886 if (version >= CUSTOM_COMPRESS_VERSION) { 1887 String codecClassname = Text.readString(in); 1888 try { 1889 Class<? extends CompressionCodec> codecClass 1890 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1891 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1892 } catch (ClassNotFoundException cnfe) { 1893 throw new IllegalArgumentException("Unknown codec: " + 1894 codecClassname, cnfe); 1895 } 1896 } else { 1897 codec = new DefaultCodec(); 1898 ((Configurable)codec).setConf(conf); 1899 } 1900 } 1901 1902 this.metadata = new Metadata(); 1903 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1904 this.metadata.readFields(in); 1905 } 1906 1907 if (version > 1) { // if version > 1 1908 in.readFully(sync); // read sync bytes 1909 headerEnd = in.getPos(); // record end of header 1910 } 1911 1912 // Initialize... *not* if this we are constructing a temporary Reader 1913 if (!tempReader) { 1914 valBuffer = new DataInputBuffer(); 1915 if (decompress) { 1916 valDecompressor = CodecPool.getDecompressor(codec); 1917 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1918 valIn = new DataInputStream(valInFilter); 1919 } else { 1920 valIn = valBuffer; 1921 } 1922 1923 if (blockCompressed) { 1924 keyLenBuffer = new DataInputBuffer(); 1925 keyBuffer = new DataInputBuffer(); 1926 valLenBuffer = new DataInputBuffer(); 1927 1928 keyLenDecompressor = CodecPool.getDecompressor(codec); 1929 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1930 keyLenDecompressor); 1931 keyLenIn = new DataInputStream(keyLenInFilter); 1932 1933 keyDecompressor = CodecPool.getDecompressor(codec); 1934 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 1935 keyIn = new DataInputStream(keyInFilter); 1936 1937 valLenDecompressor = CodecPool.getDecompressor(codec); 1938 valLenInFilter = codec.createInputStream(valLenBuffer, 1939 valLenDecompressor); 1940 valLenIn = new DataInputStream(valLenInFilter); 1941 } 1942 1943 SerializationFactory serializationFactory = 1944 new SerializationFactory(conf); 1945 this.keyDeserializer = 1946 getDeserializer(serializationFactory, getKeyClass()); 1947 if (this.keyDeserializer == null) { 1948 throw new IOException( 1949 "Could not find a deserializer for the Key class: '" 1950 + getKeyClass().getCanonicalName() + "'. " 1951 + "Please ensure that the configuration '" + 1952 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1953 + "properly configured, if you're using " 1954 + "custom serialization."); 1955 } 1956 if (!blockCompressed) { 1957 this.keyDeserializer.open(valBuffer); 1958 } else { 1959 this.keyDeserializer.open(keyIn); 1960 } 1961 this.valDeserializer = 1962 getDeserializer(serializationFactory, getValueClass()); 1963 if (this.valDeserializer == null) { 1964 throw new IOException( 1965 "Could not find a deserializer for the Value class: '" 1966 + getValueClass().getCanonicalName() + "'. " 1967 + "Please ensure that the configuration '" + 1968 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1969 + "properly configured, if you're using " 1970 + "custom serialization."); 1971 } 1972 this.valDeserializer.open(valIn); 1973 } 1974 } 1975 1976 @SuppressWarnings("unchecked") 1977 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 1978 return sf.getDeserializer(c); 1979 } 1980 1981 /** Close the file. */ 1982 @Override 1983 public synchronized void close() throws IOException { 1984 // Return the decompressors to the pool 1985 CodecPool.returnDecompressor(keyLenDecompressor); 1986 CodecPool.returnDecompressor(keyDecompressor); 1987 CodecPool.returnDecompressor(valLenDecompressor); 1988 CodecPool.returnDecompressor(valDecompressor); 1989 keyLenDecompressor = keyDecompressor = null; 1990 valLenDecompressor = valDecompressor = null; 1991 1992 if (keyDeserializer != null) { 1993 keyDeserializer.close(); 1994 } 1995 if (valDeserializer != null) { 1996 valDeserializer.close(); 1997 } 1998 1999 // Close the input-stream 2000 in.close(); 2001 } 2002 2003 /** Returns the name of the key class. */ 2004 public String getKeyClassName() { 2005 return keyClassName; 2006 } 2007 2008 /** Returns the class of keys in this file. */ 2009 public synchronized Class<?> getKeyClass() { 2010 if (null == keyClass) { 2011 try { 2012 keyClass = WritableName.getClass(getKeyClassName(), conf); 2013 } catch (IOException e) { 2014 throw new RuntimeException(e); 2015 } 2016 } 2017 return keyClass; 2018 } 2019 2020 /** Returns the name of the value class. */ 2021 public String getValueClassName() { 2022 return valClassName; 2023 } 2024 2025 /** Returns the class of values in this file. */ 2026 public synchronized Class<?> getValueClass() { 2027 if (null == valClass) { 2028 try { 2029 valClass = WritableName.getClass(getValueClassName(), conf); 2030 } catch (IOException e) { 2031 throw new RuntimeException(e); 2032 } 2033 } 2034 return valClass; 2035 } 2036 2037 /** Returns true if values are compressed. */ 2038 public boolean isCompressed() { return decompress; } 2039 2040 /** Returns true if records are block-compressed. */ 2041 public boolean isBlockCompressed() { return blockCompressed; } 2042 2043 /** Returns the compression codec of data in this file. */ 2044 public CompressionCodec getCompressionCodec() { return codec; } 2045 2046 /** 2047 * Get the compression type for this file. 2048 * @return the compression type 2049 */ 2050 public CompressionType getCompressionType() { 2051 if (decompress) { 2052 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2053 } else { 2054 return CompressionType.NONE; 2055 } 2056 } 2057 2058 /** Returns the metadata object of the file */ 2059 public Metadata getMetadata() { 2060 return this.metadata; 2061 } 2062 2063 /** Returns the configuration used for this file. */ 2064 Configuration getConf() { return conf; } 2065 2066 /** Read a compressed buffer */ 2067 private synchronized void readBuffer(DataInputBuffer buffer, 2068 CompressionInputStream filter) throws IOException { 2069 // Read data into a temporary buffer 2070 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2071 2072 try { 2073 int dataBufferLength = WritableUtils.readVInt(in); 2074 dataBuffer.write(in, dataBufferLength); 2075 2076 // Set up 'buffer' connected to the input-stream 2077 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2078 } finally { 2079 dataBuffer.close(); 2080 } 2081 2082 // Reset the codec 2083 filter.resetState(); 2084 } 2085 2086 /** Read the next 'compressed' block */ 2087 private synchronized void readBlock() throws IOException { 2088 // Check if we need to throw away a whole block of 2089 // 'values' due to 'lazy decompression' 2090 if (lazyDecompress && !valuesDecompressed) { 2091 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2092 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2093 } 2094 2095 // Reset internal states 2096 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2097 valuesDecompressed = false; 2098 2099 //Process sync 2100 if (sync != null) { 2101 in.readInt(); 2102 in.readFully(syncCheck); // read syncCheck 2103 if (!Arrays.equals(sync, syncCheck)) // check it 2104 throw new IOException("File is corrupt!"); 2105 } 2106 syncSeen = true; 2107 2108 // Read number of records in this block 2109 noBufferedRecords = WritableUtils.readVInt(in); 2110 2111 // Read key lengths and keys 2112 readBuffer(keyLenBuffer, keyLenInFilter); 2113 readBuffer(keyBuffer, keyInFilter); 2114 noBufferedKeys = noBufferedRecords; 2115 2116 // Read value lengths and values 2117 if (!lazyDecompress) { 2118 readBuffer(valLenBuffer, valLenInFilter); 2119 readBuffer(valBuffer, valInFilter); 2120 noBufferedValues = noBufferedRecords; 2121 valuesDecompressed = true; 2122 } 2123 } 2124 2125 /** 2126 * Position valLenIn/valIn to the 'value' 2127 * corresponding to the 'current' key 2128 */ 2129 private synchronized void seekToCurrentValue() throws IOException { 2130 if (!blockCompressed) { 2131 if (decompress) { 2132 valInFilter.resetState(); 2133 } 2134 valBuffer.reset(); 2135 } else { 2136 // Check if this is the first value in the 'block' to be read 2137 if (lazyDecompress && !valuesDecompressed) { 2138 // Read the value lengths and values 2139 readBuffer(valLenBuffer, valLenInFilter); 2140 readBuffer(valBuffer, valInFilter); 2141 noBufferedValues = noBufferedRecords; 2142 valuesDecompressed = true; 2143 } 2144 2145 // Calculate the no. of bytes to skip 2146 // Note: 'current' key has already been read! 2147 int skipValBytes = 0; 2148 int currentKey = noBufferedKeys + 1; 2149 for (int i=noBufferedValues; i > currentKey; --i) { 2150 skipValBytes += WritableUtils.readVInt(valLenIn); 2151 --noBufferedValues; 2152 } 2153 2154 // Skip to the 'val' corresponding to 'current' key 2155 if (skipValBytes > 0) { 2156 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2157 throw new IOException("Failed to seek to " + currentKey + 2158 "(th) value!"); 2159 } 2160 } 2161 } 2162 } 2163 2164 /** 2165 * Get the 'value' corresponding to the last read 'key'. 2166 * @param val : The 'value' to be read. 2167 * @throws IOException 2168 */ 2169 public synchronized void getCurrentValue(Writable val) 2170 throws IOException { 2171 if (val instanceof Configurable) { 2172 ((Configurable) val).setConf(this.conf); 2173 } 2174 2175 // Position stream to 'current' value 2176 seekToCurrentValue(); 2177 2178 if (!blockCompressed) { 2179 val.readFields(valIn); 2180 2181 if (valIn.read() > 0) { 2182 LOG.info("available bytes: " + valIn.available()); 2183 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2184 + " bytes, should read " + 2185 (valBuffer.getLength()-keyLength)); 2186 } 2187 } else { 2188 // Get the value 2189 int valLength = WritableUtils.readVInt(valLenIn); 2190 val.readFields(valIn); 2191 2192 // Read another compressed 'value' 2193 --noBufferedValues; 2194 2195 // Sanity check 2196 if ((valLength < 0) && LOG.isDebugEnabled()) { 2197 LOG.debug(val + " is a zero-length value"); 2198 } 2199 } 2200 2201 } 2202 2203 /** 2204 * Get the 'value' corresponding to the last read 'key'. 2205 * @param val : The 'value' to be read. 2206 * @throws IOException 2207 */ 2208 public synchronized Object getCurrentValue(Object val) 2209 throws IOException { 2210 if (val instanceof Configurable) { 2211 ((Configurable) val).setConf(this.conf); 2212 } 2213 2214 // Position stream to 'current' value 2215 seekToCurrentValue(); 2216 2217 if (!blockCompressed) { 2218 val = deserializeValue(val); 2219 2220 if (valIn.read() > 0) { 2221 LOG.info("available bytes: " + valIn.available()); 2222 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2223 + " bytes, should read " + 2224 (valBuffer.getLength()-keyLength)); 2225 } 2226 } else { 2227 // Get the value 2228 int valLength = WritableUtils.readVInt(valLenIn); 2229 val = deserializeValue(val); 2230 2231 // Read another compressed 'value' 2232 --noBufferedValues; 2233 2234 // Sanity check 2235 if ((valLength < 0) && LOG.isDebugEnabled()) { 2236 LOG.debug(val + " is a zero-length value"); 2237 } 2238 } 2239 return val; 2240 2241 } 2242 2243 @SuppressWarnings("unchecked") 2244 private Object deserializeValue(Object val) throws IOException { 2245 return valDeserializer.deserialize(val); 2246 } 2247 2248 /** Read the next key in the file into <code>key</code>, skipping its 2249 * value. True if another entry exists, and false at end of file. */ 2250 public synchronized boolean next(Writable key) throws IOException { 2251 if (key.getClass() != getKeyClass()) 2252 throw new IOException("wrong key class: "+key.getClass().getName() 2253 +" is not "+keyClass); 2254 2255 if (!blockCompressed) { 2256 outBuf.reset(); 2257 2258 keyLength = next(outBuf); 2259 if (keyLength < 0) 2260 return false; 2261 2262 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2263 2264 key.readFields(valBuffer); 2265 valBuffer.mark(0); 2266 if (valBuffer.getPosition() != keyLength) 2267 throw new IOException(key + " read " + valBuffer.getPosition() 2268 + " bytes, should read " + keyLength); 2269 } else { 2270 //Reset syncSeen 2271 syncSeen = false; 2272 2273 if (noBufferedKeys == 0) { 2274 try { 2275 readBlock(); 2276 } catch (EOFException eof) { 2277 return false; 2278 } 2279 } 2280 2281 int keyLength = WritableUtils.readVInt(keyLenIn); 2282 2283 // Sanity check 2284 if (keyLength < 0) { 2285 return false; 2286 } 2287 2288 //Read another compressed 'key' 2289 key.readFields(keyIn); 2290 --noBufferedKeys; 2291 } 2292 2293 return true; 2294 } 2295 2296 /** Read the next key/value pair in the file into <code>key</code> and 2297 * <code>val</code>. Returns true if such a pair exists and false when at 2298 * end of file */ 2299 public synchronized boolean next(Writable key, Writable val) 2300 throws IOException { 2301 if (val.getClass() != getValueClass()) 2302 throw new IOException("wrong value class: "+val+" is not "+valClass); 2303 2304 boolean more = next(key); 2305 2306 if (more) { 2307 getCurrentValue(val); 2308 } 2309 2310 return more; 2311 } 2312 2313 /** 2314 * Read and return the next record length, potentially skipping over 2315 * a sync block. 2316 * @return the length of the next record or -1 if there is no next record 2317 * @throws IOException 2318 */ 2319 private synchronized int readRecordLength() throws IOException { 2320 if (in.getPos() >= end) { 2321 return -1; 2322 } 2323 int length = in.readInt(); 2324 if (version > 1 && sync != null && 2325 length == SYNC_ESCAPE) { // process a sync entry 2326 in.readFully(syncCheck); // read syncCheck 2327 if (!Arrays.equals(sync, syncCheck)) // check it 2328 throw new IOException("File is corrupt!"); 2329 syncSeen = true; 2330 if (in.getPos() >= end) { 2331 return -1; 2332 } 2333 length = in.readInt(); // re-read length 2334 } else { 2335 syncSeen = false; 2336 } 2337 2338 return length; 2339 } 2340 2341 /** Read the next key/value pair in the file into <code>buffer</code>. 2342 * Returns the length of the key read, or -1 if at end of file. The length 2343 * of the value may be computed by calling buffer.getLength() before and 2344 * after calls to this method. */ 2345 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2346 @Deprecated 2347 synchronized int next(DataOutputBuffer buffer) throws IOException { 2348 // Unsupported for block-compressed sequence files 2349 if (blockCompressed) { 2350 throw new IOException("Unsupported call for block-compressed" + 2351 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2352 } 2353 try { 2354 int length = readRecordLength(); 2355 if (length == -1) { 2356 return -1; 2357 } 2358 int keyLength = in.readInt(); 2359 buffer.write(in, length); 2360 return keyLength; 2361 } catch (ChecksumException e) { // checksum failure 2362 handleChecksumException(e); 2363 return next(buffer); 2364 } 2365 } 2366 2367 public ValueBytes createValueBytes() { 2368 ValueBytes val = null; 2369 if (!decompress || blockCompressed) { 2370 val = new UncompressedBytes(); 2371 } else { 2372 val = new CompressedBytes(codec); 2373 } 2374 return val; 2375 } 2376 2377 /** 2378 * Read 'raw' records. 2379 * @param key - The buffer into which the key is read 2380 * @param val - The 'raw' value 2381 * @return Returns the total record length or -1 for end of file 2382 * @throws IOException 2383 */ 2384 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2385 throws IOException { 2386 if (!blockCompressed) { 2387 int length = readRecordLength(); 2388 if (length == -1) { 2389 return -1; 2390 } 2391 int keyLength = in.readInt(); 2392 int valLength = length - keyLength; 2393 key.write(in, keyLength); 2394 if (decompress) { 2395 CompressedBytes value = (CompressedBytes)val; 2396 value.reset(in, valLength); 2397 } else { 2398 UncompressedBytes value = (UncompressedBytes)val; 2399 value.reset(in, valLength); 2400 } 2401 2402 return length; 2403 } else { 2404 //Reset syncSeen 2405 syncSeen = false; 2406 2407 // Read 'key' 2408 if (noBufferedKeys == 0) { 2409 if (in.getPos() >= end) 2410 return -1; 2411 2412 try { 2413 readBlock(); 2414 } catch (EOFException eof) { 2415 return -1; 2416 } 2417 } 2418 int keyLength = WritableUtils.readVInt(keyLenIn); 2419 if (keyLength < 0) { 2420 throw new IOException("zero length key found!"); 2421 } 2422 key.write(keyIn, keyLength); 2423 --noBufferedKeys; 2424 2425 // Read raw 'value' 2426 seekToCurrentValue(); 2427 int valLength = WritableUtils.readVInt(valLenIn); 2428 UncompressedBytes rawValue = (UncompressedBytes)val; 2429 rawValue.reset(valIn, valLength); 2430 --noBufferedValues; 2431 2432 return (keyLength+valLength); 2433 } 2434 2435 } 2436 2437 /** 2438 * Read 'raw' keys. 2439 * @param key - The buffer into which the key is read 2440 * @return Returns the key length or -1 for end of file 2441 * @throws IOException 2442 */ 2443 public synchronized int nextRawKey(DataOutputBuffer key) 2444 throws IOException { 2445 if (!blockCompressed) { 2446 recordLength = readRecordLength(); 2447 if (recordLength == -1) { 2448 return -1; 2449 } 2450 keyLength = in.readInt(); 2451 key.write(in, keyLength); 2452 return keyLength; 2453 } else { 2454 //Reset syncSeen 2455 syncSeen = false; 2456 2457 // Read 'key' 2458 if (noBufferedKeys == 0) { 2459 if (in.getPos() >= end) 2460 return -1; 2461 2462 try { 2463 readBlock(); 2464 } catch (EOFException eof) { 2465 return -1; 2466 } 2467 } 2468 int keyLength = WritableUtils.readVInt(keyLenIn); 2469 if (keyLength < 0) { 2470 throw new IOException("zero length key found!"); 2471 } 2472 key.write(keyIn, keyLength); 2473 --noBufferedKeys; 2474 2475 return keyLength; 2476 } 2477 2478 } 2479 2480 /** Read the next key in the file, skipping its 2481 * value. Return null at end of file. */ 2482 public synchronized Object next(Object key) throws IOException { 2483 if (key != null && key.getClass() != getKeyClass()) { 2484 throw new IOException("wrong key class: "+key.getClass().getName() 2485 +" is not "+keyClass); 2486 } 2487 2488 if (!blockCompressed) { 2489 outBuf.reset(); 2490 2491 keyLength = next(outBuf); 2492 if (keyLength < 0) 2493 return null; 2494 2495 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2496 2497 key = deserializeKey(key); 2498 valBuffer.mark(0); 2499 if (valBuffer.getPosition() != keyLength) 2500 throw new IOException(key + " read " + valBuffer.getPosition() 2501 + " bytes, should read " + keyLength); 2502 } else { 2503 //Reset syncSeen 2504 syncSeen = false; 2505 2506 if (noBufferedKeys == 0) { 2507 try { 2508 readBlock(); 2509 } catch (EOFException eof) { 2510 return null; 2511 } 2512 } 2513 2514 int keyLength = WritableUtils.readVInt(keyLenIn); 2515 2516 // Sanity check 2517 if (keyLength < 0) { 2518 return null; 2519 } 2520 2521 //Read another compressed 'key' 2522 key = deserializeKey(key); 2523 --noBufferedKeys; 2524 } 2525 2526 return key; 2527 } 2528 2529 @SuppressWarnings("unchecked") 2530 private Object deserializeKey(Object key) throws IOException { 2531 return keyDeserializer.deserialize(key); 2532 } 2533 2534 /** 2535 * Read 'raw' values. 2536 * @param val - The 'raw' value 2537 * @return Returns the value length 2538 * @throws IOException 2539 */ 2540 public synchronized int nextRawValue(ValueBytes val) 2541 throws IOException { 2542 2543 // Position stream to current value 2544 seekToCurrentValue(); 2545 2546 if (!blockCompressed) { 2547 int valLength = recordLength - keyLength; 2548 if (decompress) { 2549 CompressedBytes value = (CompressedBytes)val; 2550 value.reset(in, valLength); 2551 } else { 2552 UncompressedBytes value = (UncompressedBytes)val; 2553 value.reset(in, valLength); 2554 } 2555 2556 return valLength; 2557 } else { 2558 int valLength = WritableUtils.readVInt(valLenIn); 2559 UncompressedBytes rawValue = (UncompressedBytes)val; 2560 rawValue.reset(valIn, valLength); 2561 --noBufferedValues; 2562 return valLength; 2563 } 2564 2565 } 2566 2567 private void handleChecksumException(ChecksumException e) 2568 throws IOException { 2569 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2570 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2571 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2572 } else { 2573 throw e; 2574 } 2575 } 2576 2577 /** disables sync. often invoked for tmp files */ 2578 synchronized void ignoreSync() { 2579 sync = null; 2580 } 2581 2582 /** Set the current byte position in the input file. 2583 * 2584 * <p>The position passed must be a position returned by {@link 2585 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2586 * position, use {@link SequenceFile.Reader#sync(long)}. 2587 */ 2588 public synchronized void seek(long position) throws IOException { 2589 in.seek(position); 2590 if (blockCompressed) { // trigger block read 2591 noBufferedKeys = 0; 2592 valuesDecompressed = true; 2593 } 2594 } 2595 2596 /** Seek to the next sync mark past a given position.*/ 2597 public synchronized void sync(long position) throws IOException { 2598 if (position+SYNC_SIZE >= end) { 2599 seek(end); 2600 return; 2601 } 2602 2603 if (position < headerEnd) { 2604 // seek directly to first record 2605 in.seek(headerEnd); 2606 // note the sync marker "seen" in the header 2607 syncSeen = true; 2608 return; 2609 } 2610 2611 try { 2612 seek(position+4); // skip escape 2613 in.readFully(syncCheck); 2614 int syncLen = sync.length; 2615 for (int i = 0; in.getPos() < end; i++) { 2616 int j = 0; 2617 for (; j < syncLen; j++) { 2618 if (sync[j] != syncCheck[(i+j)%syncLen]) 2619 break; 2620 } 2621 if (j == syncLen) { 2622 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2623 return; 2624 } 2625 syncCheck[i%syncLen] = in.readByte(); 2626 } 2627 } catch (ChecksumException e) { // checksum failure 2628 handleChecksumException(e); 2629 } 2630 } 2631 2632 /** Returns true iff the previous call to next passed a sync mark.*/ 2633 public synchronized boolean syncSeen() { return syncSeen; } 2634 2635 /** Return the current byte position in the input file. */ 2636 public synchronized long getPosition() throws IOException { 2637 return in.getPos(); 2638 } 2639 2640 /** Returns the name of the file. */ 2641 @Override 2642 public String toString() { 2643 return filename; 2644 } 2645 2646 } 2647 2648 /** Sorts key/value pairs in a sequence-format file. 2649 * 2650 * <p>For best performance, applications should make sure that the {@link 2651 * Writable#readFields(DataInput)} implementation of their keys is 2652 * very efficient. In particular, it should avoid allocating memory. 2653 */ 2654 public static class Sorter { 2655 2656 private RawComparator comparator; 2657 2658 private MergeSort mergeSort; //the implementation of merge sort 2659 2660 private Path[] inFiles; // when merging or sorting 2661 2662 private Path outFile; 2663 2664 private int memory; // bytes 2665 private int factor; // merged per pass 2666 2667 private FileSystem fs = null; 2668 2669 private Class keyClass; 2670 private Class valClass; 2671 2672 private Configuration conf; 2673 private Metadata metadata; 2674 2675 private Progressable progressable = null; 2676 2677 /** Sort and merge files containing the named classes. */ 2678 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2679 Class valClass, Configuration conf) { 2680 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2681 } 2682 2683 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2684 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2685 Class valClass, Configuration conf) { 2686 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2687 } 2688 2689 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2690 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2691 Class valClass, Configuration conf, Metadata metadata) { 2692 this.fs = fs; 2693 this.comparator = comparator; 2694 this.keyClass = keyClass; 2695 this.valClass = valClass; 2696 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2697 this.factor = conf.getInt("io.sort.factor", 100); 2698 this.conf = conf; 2699 this.metadata = metadata; 2700 } 2701 2702 /** Set the number of streams to merge at once.*/ 2703 public void setFactor(int factor) { this.factor = factor; } 2704 2705 /** Get the number of streams to merge at once.*/ 2706 public int getFactor() { return factor; } 2707 2708 /** Set the total amount of buffer memory, in bytes.*/ 2709 public void setMemory(int memory) { this.memory = memory; } 2710 2711 /** Get the total amount of buffer memory, in bytes.*/ 2712 public int getMemory() { return memory; } 2713 2714 /** Set the progressable object in order to report progress. */ 2715 public void setProgressable(Progressable progressable) { 2716 this.progressable = progressable; 2717 } 2718 2719 /** 2720 * Perform a file sort from a set of input files into an output file. 2721 * @param inFiles the files to be sorted 2722 * @param outFile the sorted output file 2723 * @param deleteInput should the input files be deleted as they are read? 2724 */ 2725 public void sort(Path[] inFiles, Path outFile, 2726 boolean deleteInput) throws IOException { 2727 if (fs.exists(outFile)) { 2728 throw new IOException("already exists: " + outFile); 2729 } 2730 2731 this.inFiles = inFiles; 2732 this.outFile = outFile; 2733 2734 int segments = sortPass(deleteInput); 2735 if (segments > 1) { 2736 mergePass(outFile.getParent()); 2737 } 2738 } 2739 2740 /** 2741 * Perform a file sort from a set of input files and return an iterator. 2742 * @param inFiles the files to be sorted 2743 * @param tempDir the directory where temp files are created during sort 2744 * @param deleteInput should the input files be deleted as they are read? 2745 * @return iterator the RawKeyValueIterator 2746 */ 2747 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2748 boolean deleteInput) throws IOException { 2749 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2750 if (fs.exists(outFile)) { 2751 throw new IOException("already exists: " + outFile); 2752 } 2753 this.inFiles = inFiles; 2754 //outFile will basically be used as prefix for temp files in the cases 2755 //where sort outputs multiple sorted segments. For the single segment 2756 //case, the outputFile itself will contain the sorted data for that 2757 //segment 2758 this.outFile = outFile; 2759 2760 int segments = sortPass(deleteInput); 2761 if (segments > 1) 2762 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2763 tempDir); 2764 else if (segments == 1) 2765 return merge(new Path[]{outFile}, true, tempDir); 2766 else return null; 2767 } 2768 2769 /** 2770 * The backwards compatible interface to sort. 2771 * @param inFile the input file to sort 2772 * @param outFile the sorted output file 2773 */ 2774 public void sort(Path inFile, Path outFile) throws IOException { 2775 sort(new Path[]{inFile}, outFile, false); 2776 } 2777 2778 private int sortPass(boolean deleteInput) throws IOException { 2779 if(LOG.isDebugEnabled()) { 2780 LOG.debug("running sort pass"); 2781 } 2782 SortPass sortPass = new SortPass(); // make the SortPass 2783 sortPass.setProgressable(progressable); 2784 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2785 try { 2786 return sortPass.run(deleteInput); // run it 2787 } finally { 2788 sortPass.close(); // close it 2789 } 2790 } 2791 2792 private class SortPass { 2793 private int memoryLimit = memory/4; 2794 private int recordLimit = 1000000; 2795 2796 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2797 private byte[] rawBuffer; 2798 2799 private int[] keyOffsets = new int[1024]; 2800 private int[] pointers = new int[keyOffsets.length]; 2801 private int[] pointersCopy = new int[keyOffsets.length]; 2802 private int[] keyLengths = new int[keyOffsets.length]; 2803 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2804 2805 private ArrayList segmentLengths = new ArrayList(); 2806 2807 private Reader in = null; 2808 private FSDataOutputStream out = null; 2809 private FSDataOutputStream indexOut = null; 2810 private Path outName; 2811 2812 private Progressable progressable = null; 2813 2814 public int run(boolean deleteInput) throws IOException { 2815 int segments = 0; 2816 int currentFile = 0; 2817 boolean atEof = (currentFile >= inFiles.length); 2818 CompressionType compressionType; 2819 CompressionCodec codec = null; 2820 segmentLengths.clear(); 2821 if (atEof) { 2822 return 0; 2823 } 2824 2825 // Initialize 2826 in = new Reader(fs, inFiles[currentFile], conf); 2827 compressionType = in.getCompressionType(); 2828 codec = in.getCompressionCodec(); 2829 2830 for (int i=0; i < rawValues.length; ++i) { 2831 rawValues[i] = null; 2832 } 2833 2834 while (!atEof) { 2835 int count = 0; 2836 int bytesProcessed = 0; 2837 rawKeys.reset(); 2838 while (!atEof && 2839 bytesProcessed < memoryLimit && count < recordLimit) { 2840 2841 // Read a record into buffer 2842 // Note: Attempt to re-use 'rawValue' as far as possible 2843 int keyOffset = rawKeys.getLength(); 2844 ValueBytes rawValue = 2845 (count == keyOffsets.length || rawValues[count] == null) ? 2846 in.createValueBytes() : 2847 rawValues[count]; 2848 int recordLength = in.nextRaw(rawKeys, rawValue); 2849 if (recordLength == -1) { 2850 in.close(); 2851 if (deleteInput) { 2852 fs.delete(inFiles[currentFile], true); 2853 } 2854 currentFile += 1; 2855 atEof = currentFile >= inFiles.length; 2856 if (!atEof) { 2857 in = new Reader(fs, inFiles[currentFile], conf); 2858 } else { 2859 in = null; 2860 } 2861 continue; 2862 } 2863 2864 int keyLength = rawKeys.getLength() - keyOffset; 2865 2866 if (count == keyOffsets.length) 2867 grow(); 2868 2869 keyOffsets[count] = keyOffset; // update pointers 2870 pointers[count] = count; 2871 keyLengths[count] = keyLength; 2872 rawValues[count] = rawValue; 2873 2874 bytesProcessed += recordLength; 2875 count++; 2876 } 2877 2878 // buffer is full -- sort & flush it 2879 if(LOG.isDebugEnabled()) { 2880 LOG.debug("flushing segment " + segments); 2881 } 2882 rawBuffer = rawKeys.getData(); 2883 sort(count); 2884 // indicate we're making progress 2885 if (progressable != null) { 2886 progressable.progress(); 2887 } 2888 flush(count, bytesProcessed, compressionType, codec, 2889 segments==0 && atEof); 2890 segments++; 2891 } 2892 return segments; 2893 } 2894 2895 public void close() throws IOException { 2896 if (in != null) { 2897 in.close(); 2898 } 2899 if (out != null) { 2900 out.close(); 2901 } 2902 if (indexOut != null) { 2903 indexOut.close(); 2904 } 2905 } 2906 2907 private void grow() { 2908 int newLength = keyOffsets.length * 3 / 2; 2909 keyOffsets = grow(keyOffsets, newLength); 2910 pointers = grow(pointers, newLength); 2911 pointersCopy = new int[newLength]; 2912 keyLengths = grow(keyLengths, newLength); 2913 rawValues = grow(rawValues, newLength); 2914 } 2915 2916 private int[] grow(int[] old, int newLength) { 2917 int[] result = new int[newLength]; 2918 System.arraycopy(old, 0, result, 0, old.length); 2919 return result; 2920 } 2921 2922 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2923 ValueBytes[] result = new ValueBytes[newLength]; 2924 System.arraycopy(old, 0, result, 0, old.length); 2925 for (int i=old.length; i < newLength; ++i) { 2926 result[i] = null; 2927 } 2928 return result; 2929 } 2930 2931 private void flush(int count, int bytesProcessed, 2932 CompressionType compressionType, 2933 CompressionCodec codec, 2934 boolean done) throws IOException { 2935 if (out == null) { 2936 outName = done ? outFile : outFile.suffix(".0"); 2937 out = fs.create(outName); 2938 if (!done) { 2939 indexOut = fs.create(outName.suffix(".index")); 2940 } 2941 } 2942 2943 long segmentStart = out.getPos(); 2944 Writer writer = createWriter(conf, Writer.stream(out), 2945 Writer.keyClass(keyClass), Writer.valueClass(valClass), 2946 Writer.compression(compressionType, codec), 2947 Writer.metadata(done ? metadata : new Metadata())); 2948 2949 if (!done) { 2950 writer.sync = null; // disable sync on temp files 2951 } 2952 2953 for (int i = 0; i < count; i++) { // write in sorted order 2954 int p = pointers[i]; 2955 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 2956 } 2957 writer.close(); 2958 2959 if (!done) { 2960 // Save the segment length 2961 WritableUtils.writeVLong(indexOut, segmentStart); 2962 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 2963 indexOut.flush(); 2964 } 2965 } 2966 2967 private void sort(int count) { 2968 System.arraycopy(pointers, 0, pointersCopy, 0, count); 2969 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 2970 } 2971 class SeqFileComparator implements Comparator<IntWritable> { 2972 @Override 2973 public int compare(IntWritable I, IntWritable J) { 2974 return comparator.compare(rawBuffer, keyOffsets[I.get()], 2975 keyLengths[I.get()], rawBuffer, 2976 keyOffsets[J.get()], keyLengths[J.get()]); 2977 } 2978 } 2979 2980 /** set the progressable object in order to report progress */ 2981 public void setProgressable(Progressable progressable) 2982 { 2983 this.progressable = progressable; 2984 } 2985 2986 } // SequenceFile.Sorter.SortPass 2987 2988 /** The interface to iterate over raw keys/values of SequenceFiles. */ 2989 public static interface RawKeyValueIterator { 2990 /** Gets the current raw key 2991 * @return DataOutputBuffer 2992 * @throws IOException 2993 */ 2994 DataOutputBuffer getKey() throws IOException; 2995 /** Gets the current raw value 2996 * @return ValueBytes 2997 * @throws IOException 2998 */ 2999 ValueBytes getValue() throws IOException; 3000 /** Sets up the current key and value (for getKey and getValue) 3001 * @return true if there exists a key/value, false otherwise 3002 * @throws IOException 3003 */ 3004 boolean next() throws IOException; 3005 /** closes the iterator so that the underlying streams can be closed 3006 * @throws IOException 3007 */ 3008 void close() throws IOException; 3009 /** Gets the Progress object; this has a float (0.0 - 1.0) 3010 * indicating the bytes processed by the iterator so far 3011 */ 3012 Progress getProgress(); 3013 } 3014 3015 /** 3016 * Merges the list of segments of type <code>SegmentDescriptor</code> 3017 * @param segments the list of SegmentDescriptors 3018 * @param tmpDir the directory to write temporary files into 3019 * @return RawKeyValueIterator 3020 * @throws IOException 3021 */ 3022 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3023 Path tmpDir) 3024 throws IOException { 3025 // pass in object to report progress, if present 3026 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3027 return mQueue.merge(); 3028 } 3029 3030 /** 3031 * Merges the contents of files passed in Path[] using a max factor value 3032 * that is already set 3033 * @param inNames the array of path names 3034 * @param deleteInputs true if the input files should be deleted when 3035 * unnecessary 3036 * @param tmpDir the directory to write temporary files into 3037 * @return RawKeyValueIteratorMergeQueue 3038 * @throws IOException 3039 */ 3040 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3041 Path tmpDir) 3042 throws IOException { 3043 return merge(inNames, deleteInputs, 3044 (inNames.length < factor) ? inNames.length : factor, 3045 tmpDir); 3046 } 3047 3048 /** 3049 * Merges the contents of files passed in Path[] 3050 * @param inNames the array of path names 3051 * @param deleteInputs true if the input files should be deleted when 3052 * unnecessary 3053 * @param factor the factor that will be used as the maximum merge fan-in 3054 * @param tmpDir the directory to write temporary files into 3055 * @return RawKeyValueIteratorMergeQueue 3056 * @throws IOException 3057 */ 3058 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3059 int factor, Path tmpDir) 3060 throws IOException { 3061 //get the segments from inNames 3062 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3063 for (int i = 0; i < inNames.length; i++) { 3064 SegmentDescriptor s = new SegmentDescriptor(0, 3065 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3066 s.preserveInput(!deleteInputs); 3067 s.doSync(); 3068 a.add(s); 3069 } 3070 this.factor = factor; 3071 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3072 return mQueue.merge(); 3073 } 3074 3075 /** 3076 * Merges the contents of files passed in Path[] 3077 * @param inNames the array of path names 3078 * @param tempDir the directory for creating temp files during merge 3079 * @param deleteInputs true if the input files should be deleted when 3080 * unnecessary 3081 * @return RawKeyValueIteratorMergeQueue 3082 * @throws IOException 3083 */ 3084 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3085 boolean deleteInputs) 3086 throws IOException { 3087 //outFile will basically be used as prefix for temp files for the 3088 //intermediate merge outputs 3089 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3090 //get the segments from inNames 3091 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3092 for (int i = 0; i < inNames.length; i++) { 3093 SegmentDescriptor s = new SegmentDescriptor(0, 3094 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3095 s.preserveInput(!deleteInputs); 3096 s.doSync(); 3097 a.add(s); 3098 } 3099 factor = (inNames.length < factor) ? inNames.length : factor; 3100 // pass in object to report progress, if present 3101 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3102 return mQueue.merge(); 3103 } 3104 3105 /** 3106 * Clones the attributes (like compression of the input file and creates a 3107 * corresponding Writer 3108 * @param inputFile the path of the input file whose attributes should be 3109 * cloned 3110 * @param outputFile the path of the output file 3111 * @param prog the Progressable to report status during the file write 3112 * @return Writer 3113 * @throws IOException 3114 */ 3115 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3116 Progressable prog) throws IOException { 3117 Reader reader = new Reader(conf, 3118 Reader.file(inputFile), 3119 new Reader.OnlyHeaderOption()); 3120 CompressionType compress = reader.getCompressionType(); 3121 CompressionCodec codec = reader.getCompressionCodec(); 3122 reader.close(); 3123 3124 Writer writer = createWriter(conf, 3125 Writer.file(outputFile), 3126 Writer.keyClass(keyClass), 3127 Writer.valueClass(valClass), 3128 Writer.compression(compress, codec), 3129 Writer.progressable(prog)); 3130 return writer; 3131 } 3132 3133 /** 3134 * Writes records from RawKeyValueIterator into a file represented by the 3135 * passed writer 3136 * @param records the RawKeyValueIterator 3137 * @param writer the Writer created earlier 3138 * @throws IOException 3139 */ 3140 public void writeFile(RawKeyValueIterator records, Writer writer) 3141 throws IOException { 3142 while(records.next()) { 3143 writer.appendRaw(records.getKey().getData(), 0, 3144 records.getKey().getLength(), records.getValue()); 3145 } 3146 writer.sync(); 3147 } 3148 3149 /** Merge the provided files. 3150 * @param inFiles the array of input path names 3151 * @param outFile the final output file 3152 * @throws IOException 3153 */ 3154 public void merge(Path[] inFiles, Path outFile) throws IOException { 3155 if (fs.exists(outFile)) { 3156 throw new IOException("already exists: " + outFile); 3157 } 3158 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3159 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3160 3161 writeFile(r, writer); 3162 3163 writer.close(); 3164 } 3165 3166 /** sort calls this to generate the final merged output */ 3167 private int mergePass(Path tmpDir) throws IOException { 3168 if(LOG.isDebugEnabled()) { 3169 LOG.debug("running merge pass"); 3170 } 3171 Writer writer = cloneFileAttributes( 3172 outFile.suffix(".0"), outFile, null); 3173 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3174 outFile.suffix(".0.index"), tmpDir); 3175 writeFile(r, writer); 3176 3177 writer.close(); 3178 return 0; 3179 } 3180 3181 /** Used by mergePass to merge the output of the sort 3182 * @param inName the name of the input file containing sorted segments 3183 * @param indexIn the offsets of the sorted segments 3184 * @param tmpDir the relative directory to store intermediate results in 3185 * @return RawKeyValueIterator 3186 * @throws IOException 3187 */ 3188 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3189 throws IOException { 3190 //get the segments from indexIn 3191 //we create a SegmentContainer so that we can track segments belonging to 3192 //inName and delete inName as soon as we see that we have looked at all 3193 //the contained segments during the merge process & hence don't need 3194 //them anymore 3195 SegmentContainer container = new SegmentContainer(inName, indexIn); 3196 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3197 return mQueue.merge(); 3198 } 3199 3200 /** This class implements the core of the merge logic */ 3201 private class MergeQueue extends PriorityQueue 3202 implements RawKeyValueIterator { 3203 private boolean compress; 3204 private boolean blockCompress; 3205 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3206 private ValueBytes rawValue; 3207 private long totalBytesProcessed; 3208 private float progPerByte; 3209 private Progress mergeProgress = new Progress(); 3210 private Path tmpDir; 3211 private Progressable progress = null; //handle to the progress reporting object 3212 private SegmentDescriptor minSegment; 3213 3214 //a TreeMap used to store the segments sorted by size (segment offset and 3215 //segment path name is used to break ties between segments of same sizes) 3216 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3217 new TreeMap<SegmentDescriptor, Void>(); 3218 3219 @SuppressWarnings("unchecked") 3220 public void put(SegmentDescriptor stream) throws IOException { 3221 if (size() == 0) { 3222 compress = stream.in.isCompressed(); 3223 blockCompress = stream.in.isBlockCompressed(); 3224 } else if (compress != stream.in.isCompressed() || 3225 blockCompress != stream.in.isBlockCompressed()) { 3226 throw new IOException("All merged files must be compressed or not."); 3227 } 3228 super.put(stream); 3229 } 3230 3231 /** 3232 * A queue of file segments to merge 3233 * @param segments the file segments to merge 3234 * @param tmpDir a relative local directory to save intermediate files in 3235 * @param progress the reference to the Progressable object 3236 */ 3237 public MergeQueue(List <SegmentDescriptor> segments, 3238 Path tmpDir, Progressable progress) { 3239 int size = segments.size(); 3240 for (int i = 0; i < size; i++) { 3241 sortedSegmentSizes.put(segments.get(i), null); 3242 } 3243 this.tmpDir = tmpDir; 3244 this.progress = progress; 3245 } 3246 @Override 3247 protected boolean lessThan(Object a, Object b) { 3248 // indicate we're making progress 3249 if (progress != null) { 3250 progress.progress(); 3251 } 3252 SegmentDescriptor msa = (SegmentDescriptor)a; 3253 SegmentDescriptor msb = (SegmentDescriptor)b; 3254 return comparator.compare(msa.getKey().getData(), 0, 3255 msa.getKey().getLength(), msb.getKey().getData(), 0, 3256 msb.getKey().getLength()) < 0; 3257 } 3258 @Override 3259 public void close() throws IOException { 3260 SegmentDescriptor ms; // close inputs 3261 while ((ms = (SegmentDescriptor)pop()) != null) { 3262 ms.cleanup(); 3263 } 3264 minSegment = null; 3265 } 3266 @Override 3267 public DataOutputBuffer getKey() throws IOException { 3268 return rawKey; 3269 } 3270 @Override 3271 public ValueBytes getValue() throws IOException { 3272 return rawValue; 3273 } 3274 @Override 3275 public boolean next() throws IOException { 3276 if (size() == 0) 3277 return false; 3278 if (minSegment != null) { 3279 //minSegment is non-null for all invocations of next except the first 3280 //one. For the first invocation, the priority queue is ready for use 3281 //but for the subsequent invocations, first adjust the queue 3282 adjustPriorityQueue(minSegment); 3283 if (size() == 0) { 3284 minSegment = null; 3285 return false; 3286 } 3287 } 3288 minSegment = (SegmentDescriptor)top(); 3289 long startPos = minSegment.in.getPosition(); // Current position in stream 3290 //save the raw key reference 3291 rawKey = minSegment.getKey(); 3292 //load the raw value. Re-use the existing rawValue buffer 3293 if (rawValue == null) { 3294 rawValue = minSegment.in.createValueBytes(); 3295 } 3296 minSegment.nextRawValue(rawValue); 3297 long endPos = minSegment.in.getPosition(); // End position after reading value 3298 updateProgress(endPos - startPos); 3299 return true; 3300 } 3301 3302 @Override 3303 public Progress getProgress() { 3304 return mergeProgress; 3305 } 3306 3307 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3308 long startPos = ms.in.getPosition(); // Current position in stream 3309 boolean hasNext = ms.nextRawKey(); 3310 long endPos = ms.in.getPosition(); // End position after reading key 3311 updateProgress(endPos - startPos); 3312 if (hasNext) { 3313 adjustTop(); 3314 } else { 3315 pop(); 3316 ms.cleanup(); 3317 } 3318 } 3319 3320 private void updateProgress(long bytesProcessed) { 3321 totalBytesProcessed += bytesProcessed; 3322 if (progPerByte > 0) { 3323 mergeProgress.set(totalBytesProcessed * progPerByte); 3324 } 3325 } 3326 3327 /** This is the single level merge that is called multiple times 3328 * depending on the factor size and the number of segments 3329 * @return RawKeyValueIterator 3330 * @throws IOException 3331 */ 3332 public RawKeyValueIterator merge() throws IOException { 3333 //create the MergeStreams from the sorted map created in the constructor 3334 //and dump the final output to a file 3335 int numSegments = sortedSegmentSizes.size(); 3336 int origFactor = factor; 3337 int passNo = 1; 3338 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3339 do { 3340 //get the factor for this pass of merge 3341 factor = getPassFactor(passNo, numSegments); 3342 List<SegmentDescriptor> segmentsToMerge = 3343 new ArrayList<SegmentDescriptor>(); 3344 int segmentsConsidered = 0; 3345 int numSegmentsToConsider = factor; 3346 while (true) { 3347 //extract the smallest 'factor' number of segment pointers from the 3348 //TreeMap. Call cleanup on the empty segments (no key/value data) 3349 SegmentDescriptor[] mStream = 3350 getSegmentDescriptors(numSegmentsToConsider); 3351 for (int i = 0; i < mStream.length; i++) { 3352 if (mStream[i].nextRawKey()) { 3353 segmentsToMerge.add(mStream[i]); 3354 segmentsConsidered++; 3355 // Count the fact that we read some bytes in calling nextRawKey() 3356 updateProgress(mStream[i].in.getPosition()); 3357 } 3358 else { 3359 mStream[i].cleanup(); 3360 numSegments--; //we ignore this segment for the merge 3361 } 3362 } 3363 //if we have the desired number of segments 3364 //or looked at all available segments, we break 3365 if (segmentsConsidered == factor || 3366 sortedSegmentSizes.size() == 0) { 3367 break; 3368 } 3369 3370 numSegmentsToConsider = factor - segmentsConsidered; 3371 } 3372 //feed the streams to the priority queue 3373 initialize(segmentsToMerge.size()); clear(); 3374 for (int i = 0; i < segmentsToMerge.size(); i++) { 3375 put(segmentsToMerge.get(i)); 3376 } 3377 //if we have lesser number of segments remaining, then just return the 3378 //iterator, else do another single level merge 3379 if (numSegments <= factor) { 3380 //calculate the length of the remaining segments. Required for 3381 //calculating the merge progress 3382 long totalBytes = 0; 3383 for (int i = 0; i < segmentsToMerge.size(); i++) { 3384 totalBytes += segmentsToMerge.get(i).segmentLength; 3385 } 3386 if (totalBytes != 0) //being paranoid 3387 progPerByte = 1.0f / (float)totalBytes; 3388 //reset factor to what it originally was 3389 factor = origFactor; 3390 return this; 3391 } else { 3392 //we want to spread the creation of temp files on multiple disks if 3393 //available under the space constraints 3394 long approxOutputSize = 0; 3395 for (SegmentDescriptor s : segmentsToMerge) { 3396 approxOutputSize += s.segmentLength + 3397 ChecksumFileSystem.getApproxChkSumLength( 3398 s.segmentLength); 3399 } 3400 Path tmpFilename = 3401 new Path(tmpDir, "intermediate").suffix("." + passNo); 3402 3403 Path outputFile = lDirAlloc.getLocalPathForWrite( 3404 tmpFilename.toString(), 3405 approxOutputSize, conf); 3406 if(LOG.isDebugEnabled()) { 3407 LOG.debug("writing intermediate results to " + outputFile); 3408 } 3409 Writer writer = cloneFileAttributes( 3410 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3411 fs.makeQualified(outputFile), null); 3412 writer.sync = null; //disable sync for temp files 3413 writeFile(this, writer); 3414 writer.close(); 3415 3416 //we finished one single level merge; now clean up the priority 3417 //queue 3418 this.close(); 3419 3420 SegmentDescriptor tempSegment = 3421 new SegmentDescriptor(0, 3422 fs.getFileStatus(outputFile).getLen(), outputFile); 3423 //put the segment back in the TreeMap 3424 sortedSegmentSizes.put(tempSegment, null); 3425 numSegments = sortedSegmentSizes.size(); 3426 passNo++; 3427 } 3428 //we are worried about only the first pass merge factor. So reset the 3429 //factor to what it originally was 3430 factor = origFactor; 3431 } while(true); 3432 } 3433 3434 //Hadoop-591 3435 public int getPassFactor(int passNo, int numSegments) { 3436 if (passNo > 1 || numSegments <= factor || factor == 1) 3437 return factor; 3438 int mod = (numSegments - 1) % (factor - 1); 3439 if (mod == 0) 3440 return factor; 3441 return mod + 1; 3442 } 3443 3444 /** Return (& remove) the requested number of segment descriptors from the 3445 * sorted map. 3446 */ 3447 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3448 if (numDescriptors > sortedSegmentSizes.size()) 3449 numDescriptors = sortedSegmentSizes.size(); 3450 SegmentDescriptor[] SegmentDescriptors = 3451 new SegmentDescriptor[numDescriptors]; 3452 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3453 int i = 0; 3454 while (i < numDescriptors) { 3455 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3456 iter.remove(); 3457 } 3458 return SegmentDescriptors; 3459 } 3460 } // SequenceFile.Sorter.MergeQueue 3461 3462 /** This class defines a merge segment. This class can be subclassed to 3463 * provide a customized cleanup method implementation. In this 3464 * implementation, cleanup closes the file handle and deletes the file 3465 */ 3466 public class SegmentDescriptor implements Comparable { 3467 3468 long segmentOffset; //the start of the segment in the file 3469 long segmentLength; //the length of the segment 3470 Path segmentPathName; //the path name of the file containing the segment 3471 boolean ignoreSync = true; //set to true for temp files 3472 private Reader in = null; 3473 private DataOutputBuffer rawKey = null; //this will hold the current key 3474 private boolean preserveInput = false; //delete input segment files? 3475 3476 /** Constructs a segment 3477 * @param segmentOffset the offset of the segment in the file 3478 * @param segmentLength the length of the segment 3479 * @param segmentPathName the path name of the file containing the segment 3480 */ 3481 public SegmentDescriptor (long segmentOffset, long segmentLength, 3482 Path segmentPathName) { 3483 this.segmentOffset = segmentOffset; 3484 this.segmentLength = segmentLength; 3485 this.segmentPathName = segmentPathName; 3486 } 3487 3488 /** Do the sync checks */ 3489 public void doSync() {ignoreSync = false;} 3490 3491 /** Whether to delete the files when no longer needed */ 3492 public void preserveInput(boolean preserve) { 3493 preserveInput = preserve; 3494 } 3495 3496 public boolean shouldPreserveInput() { 3497 return preserveInput; 3498 } 3499 3500 @Override 3501 public int compareTo(Object o) { 3502 SegmentDescriptor that = (SegmentDescriptor)o; 3503 if (this.segmentLength != that.segmentLength) { 3504 return (this.segmentLength < that.segmentLength ? -1 : 1); 3505 } 3506 if (this.segmentOffset != that.segmentOffset) { 3507 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3508 } 3509 return (this.segmentPathName.toString()). 3510 compareTo(that.segmentPathName.toString()); 3511 } 3512 3513 @Override 3514 public boolean equals(Object o) { 3515 if (!(o instanceof SegmentDescriptor)) { 3516 return false; 3517 } 3518 SegmentDescriptor that = (SegmentDescriptor)o; 3519 if (this.segmentLength == that.segmentLength && 3520 this.segmentOffset == that.segmentOffset && 3521 this.segmentPathName.toString().equals( 3522 that.segmentPathName.toString())) { 3523 return true; 3524 } 3525 return false; 3526 } 3527 3528 @Override 3529 public int hashCode() { 3530 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3531 } 3532 3533 /** Fills up the rawKey object with the key returned by the Reader 3534 * @return true if there is a key returned; false, otherwise 3535 * @throws IOException 3536 */ 3537 public boolean nextRawKey() throws IOException { 3538 if (in == null) { 3539 int bufferSize = getBufferSize(conf); 3540 Reader reader = new Reader(conf, 3541 Reader.file(segmentPathName), 3542 Reader.bufferSize(bufferSize), 3543 Reader.start(segmentOffset), 3544 Reader.length(segmentLength)); 3545 3546 //sometimes we ignore syncs especially for temp merge files 3547 if (ignoreSync) reader.ignoreSync(); 3548 3549 if (reader.getKeyClass() != keyClass) 3550 throw new IOException("wrong key class: " + reader.getKeyClass() + 3551 " is not " + keyClass); 3552 if (reader.getValueClass() != valClass) 3553 throw new IOException("wrong value class: "+reader.getValueClass()+ 3554 " is not " + valClass); 3555 this.in = reader; 3556 rawKey = new DataOutputBuffer(); 3557 } 3558 rawKey.reset(); 3559 int keyLength = 3560 in.nextRawKey(rawKey); 3561 return (keyLength >= 0); 3562 } 3563 3564 /** Fills up the passed rawValue with the value corresponding to the key 3565 * read earlier 3566 * @param rawValue 3567 * @return the length of the value 3568 * @throws IOException 3569 */ 3570 public int nextRawValue(ValueBytes rawValue) throws IOException { 3571 int valLength = in.nextRawValue(rawValue); 3572 return valLength; 3573 } 3574 3575 /** Returns the stored rawKey */ 3576 public DataOutputBuffer getKey() { 3577 return rawKey; 3578 } 3579 3580 /** closes the underlying reader */ 3581 private void close() throws IOException { 3582 this.in.close(); 3583 this.in = null; 3584 } 3585 3586 /** The default cleanup. Subclasses can override this with a custom 3587 * cleanup 3588 */ 3589 public void cleanup() throws IOException { 3590 close(); 3591 if (!preserveInput) { 3592 fs.delete(segmentPathName, true); 3593 } 3594 } 3595 } // SequenceFile.Sorter.SegmentDescriptor 3596 3597 /** This class provisions multiple segments contained within a single 3598 * file 3599 */ 3600 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3601 3602 SegmentContainer parentContainer = null; 3603 3604 /** Constructs a segment 3605 * @param segmentOffset the offset of the segment in the file 3606 * @param segmentLength the length of the segment 3607 * @param segmentPathName the path name of the file containing the segment 3608 * @param parent the parent SegmentContainer that holds the segment 3609 */ 3610 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3611 Path segmentPathName, SegmentContainer parent) { 3612 super(segmentOffset, segmentLength, segmentPathName); 3613 this.parentContainer = parent; 3614 } 3615 /** The default cleanup. Subclasses can override this with a custom 3616 * cleanup 3617 */ 3618 @Override 3619 public void cleanup() throws IOException { 3620 super.close(); 3621 if (super.shouldPreserveInput()) return; 3622 parentContainer.cleanup(); 3623 } 3624 3625 @Override 3626 public boolean equals(Object o) { 3627 if (!(o instanceof LinkedSegmentsDescriptor)) { 3628 return false; 3629 } 3630 return super.equals(o); 3631 } 3632 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3633 3634 /** The class that defines a container for segments to be merged. Primarily 3635 * required to delete temp files as soon as all the contained segments 3636 * have been looked at */ 3637 private class SegmentContainer { 3638 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3639 private int numSegmentsContained; //# of segments contained 3640 private Path inName; //input file from where segments are created 3641 3642 //the list of segments read from the file 3643 private ArrayList <SegmentDescriptor> segments = 3644 new ArrayList <SegmentDescriptor>(); 3645 /** This constructor is there primarily to serve the sort routine that 3646 * generates a single output file with an associated index file */ 3647 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3648 //get the segments from indexIn 3649 FSDataInputStream fsIndexIn = fs.open(indexIn); 3650 long end = fs.getFileStatus(indexIn).getLen(); 3651 while (fsIndexIn.getPos() < end) { 3652 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3653 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3654 Path segmentName = inName; 3655 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3656 segmentLength, segmentName, this)); 3657 } 3658 fsIndexIn.close(); 3659 fs.delete(indexIn, true); 3660 numSegmentsContained = segments.size(); 3661 this.inName = inName; 3662 } 3663 3664 public List <SegmentDescriptor> getSegmentList() { 3665 return segments; 3666 } 3667 public void cleanup() throws IOException { 3668 numSegmentsCleanedUp++; 3669 if (numSegmentsCleanedUp == numSegmentsContained) { 3670 fs.delete(inName, true); 3671 } 3672 } 3673 } //SequenceFile.Sorter.SegmentContainer 3674 3675 } // SequenceFile.Sorter 3676 3677 } // SequenceFile