001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.*; 022 import java.util.*; 023 import java.rmi.server.UID; 024 import java.security.MessageDigest; 025 import org.apache.commons.logging.*; 026 import org.apache.hadoop.util.Options; 027 import org.apache.hadoop.fs.*; 028 import org.apache.hadoop.fs.Options.CreateOpts; 029 import org.apache.hadoop.io.compress.CodecPool; 030 import org.apache.hadoop.io.compress.CompressionCodec; 031 import org.apache.hadoop.io.compress.CompressionInputStream; 032 import org.apache.hadoop.io.compress.CompressionOutputStream; 033 import org.apache.hadoop.io.compress.Compressor; 034 import org.apache.hadoop.io.compress.Decompressor; 035 import org.apache.hadoop.io.compress.DefaultCodec; 036 import org.apache.hadoop.io.compress.GzipCodec; 037 import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038 import org.apache.hadoop.io.serializer.Deserializer; 039 import org.apache.hadoop.io.serializer.Serializer; 040 import org.apache.hadoop.io.serializer.SerializationFactory; 041 import org.apache.hadoop.classification.InterfaceAudience; 042 import org.apache.hadoop.classification.InterfaceStability; 043 import org.apache.hadoop.conf.*; 044 import org.apache.hadoop.util.Progressable; 045 import org.apache.hadoop.util.Progress; 046 import org.apache.hadoop.util.ReflectionUtils; 047 import org.apache.hadoop.util.NativeCodeLoader; 048 import org.apache.hadoop.util.MergeSort; 049 import org.apache.hadoop.util.PriorityQueue; 050 import org.apache.hadoop.util.Time; 051 052 /** 053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 054 * pairs. 055 * 056 * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and 057 * {@link Sorter} classes for writing, reading and sorting respectively.</p> 058 * 059 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 060 * {@link CompressionType} used to compress key/value pairs: 061 * <ol> 062 * <li> 063 * <code>Writer</code> : Uncompressed records. 064 * </li> 065 * <li> 066 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 067 * values. 068 * </li> 069 * <li> 070 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 071 * values are collected in 'blocks' 072 * separately and compressed. The size of 073 * the 'block' is configurable. 074 * </ol> 075 * 076 * <p>The actual compression algorithm used to compress key and/or values can be 077 * specified by using the appropriate {@link CompressionCodec}.</p> 078 * 079 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 080 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 081 * 082 * <p>The {@link Reader} acts as the bridge and can read any of the above 083 * <code>SequenceFile</code> formats.</p> 084 * 085 * <h4 id="Formats">SequenceFile Formats</h4> 086 * 087 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 088 * depending on the <code>CompressionType</code> specified. All of them share a 089 * <a href="#Header">common header</a> described below. 090 * 091 * <h5 id="Header">SequenceFile Header</h5> 092 * <ul> 093 * <li> 094 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 095 * version number (e.g. SEQ4 or SEQ6) 096 * </li> 097 * <li> 098 * keyClassName -key class 099 * </li> 100 * <li> 101 * valueClassName - value class 102 * </li> 103 * <li> 104 * compression - A boolean which specifies if compression is turned on for 105 * keys/values in this file. 106 * </li> 107 * <li> 108 * blockCompression - A boolean which specifies if block-compression is 109 * turned on for keys/values in this file. 110 * </li> 111 * <li> 112 * compression codec - <code>CompressionCodec</code> class which is used for 113 * compression of keys and/or values (if compression is 114 * enabled). 115 * </li> 116 * <li> 117 * metadata - {@link Metadata} for this file. 118 * </li> 119 * <li> 120 * sync - A sync marker to denote end of the header. 121 * </li> 122 * </ul> 123 * 124 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 125 * <ul> 126 * <li> 127 * <a href="#Header">Header</a> 128 * </li> 129 * <li> 130 * Record 131 * <ul> 132 * <li>Record length</li> 133 * <li>Key length</li> 134 * <li>Key</li> 135 * <li>Value</li> 136 * </ul> 137 * </li> 138 * <li> 139 * A sync-marker every few <code>100</code> bytes or so. 140 * </li> 141 * </ul> 142 * 143 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 144 * <ul> 145 * <li> 146 * <a href="#Header">Header</a> 147 * </li> 148 * <li> 149 * Record 150 * <ul> 151 * <li>Record length</li> 152 * <li>Key length</li> 153 * <li>Key</li> 154 * <li><i>Compressed</i> Value</li> 155 * </ul> 156 * </li> 157 * <li> 158 * A sync-marker every few <code>100</code> bytes or so. 159 * </li> 160 * </ul> 161 * 162 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 163 * <ul> 164 * <li> 165 * <a href="#Header">Header</a> 166 * </li> 167 * <li> 168 * Record <i>Block</i> 169 * <ul> 170 * <li>Uncompressed number of records in the block</li> 171 * <li>Compressed key-lengths block-size</li> 172 * <li>Compressed key-lengths block</li> 173 * <li>Compressed keys block-size</li> 174 * <li>Compressed keys block</li> 175 * <li>Compressed value-lengths block-size</li> 176 * <li>Compressed value-lengths block</li> 177 * <li>Compressed values block-size</li> 178 * <li>Compressed values block</li> 179 * </ul> 180 * </li> 181 * <li> 182 * A sync-marker every block. 183 * </li> 184 * </ul> 185 * 186 * <p>The compressed blocks of key lengths and value lengths consist of the 187 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 188 * format.</p> 189 * 190 * @see CompressionCodec 191 */ 192 @InterfaceAudience.Public 193 @InterfaceStability.Stable 194 public class SequenceFile { 195 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 196 197 private SequenceFile() {} // no public ctor 198 199 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 200 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 201 private static final byte VERSION_WITH_METADATA = (byte)6; 202 private static byte[] VERSION = new byte[] { 203 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 204 }; 205 206 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 207 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 208 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 209 210 /** The number of bytes between sync points.*/ 211 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 212 213 /** 214 * The compression type used to compress key/value pairs in the 215 * {@link SequenceFile}. 216 * 217 * @see SequenceFile.Writer 218 */ 219 public static enum CompressionType { 220 /** Do not compress records. */ 221 NONE, 222 /** Compress values only, each separately. */ 223 RECORD, 224 /** Compress sequences of records together in blocks. */ 225 BLOCK 226 } 227 228 /** 229 * Get the compression type for the reduce outputs 230 * @param job the job config to look in 231 * @return the kind of compression to use 232 */ 233 static public CompressionType getDefaultCompressionType(Configuration job) { 234 String name = job.get("io.seqfile.compression.type"); 235 return name == null ? CompressionType.RECORD : 236 CompressionType.valueOf(name); 237 } 238 239 /** 240 * Set the default compression type for sequence files. 241 * @param job the configuration to modify 242 * @param val the new compression type (none, block, record) 243 */ 244 static public void setDefaultCompressionType(Configuration job, 245 CompressionType val) { 246 job.set("io.seqfile.compression.type", val.toString()); 247 } 248 249 /** 250 * Create a new Writer with the given options. 251 * @param conf the configuration to use 252 * @param opts the options to create the file with 253 * @return a new Writer 254 * @throws IOException 255 */ 256 public static Writer createWriter(Configuration conf, Writer.Option... opts 257 ) throws IOException { 258 Writer.CompressionOption compressionOption = 259 Options.getOption(Writer.CompressionOption.class, opts); 260 CompressionType kind; 261 if (compressionOption != null) { 262 kind = compressionOption.getValue(); 263 } else { 264 kind = getDefaultCompressionType(conf); 265 opts = Options.prependOptions(opts, Writer.compression(kind)); 266 } 267 switch (kind) { 268 default: 269 case NONE: 270 return new Writer(conf, opts); 271 case RECORD: 272 return new RecordCompressWriter(conf, opts); 273 case BLOCK: 274 return new BlockCompressWriter(conf, opts); 275 } 276 } 277 278 /** 279 * Construct the preferred type of SequenceFile Writer. 280 * @param fs The configured filesystem. 281 * @param conf The configuration. 282 * @param name The name of the file. 283 * @param keyClass The 'key' type. 284 * @param valClass The 'value' type. 285 * @return Returns the handle to the constructed SequenceFile Writer. 286 * @throws IOException 287 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 288 * instead. 289 */ 290 @Deprecated 291 public static Writer 292 createWriter(FileSystem fs, Configuration conf, Path name, 293 Class keyClass, Class valClass) throws IOException { 294 return createWriter(conf, Writer.filesystem(fs), 295 Writer.file(name), Writer.keyClass(keyClass), 296 Writer.valueClass(valClass)); 297 } 298 299 /** 300 * Construct the preferred type of SequenceFile Writer. 301 * @param fs The configured filesystem. 302 * @param conf The configuration. 303 * @param name The name of the file. 304 * @param keyClass The 'key' type. 305 * @param valClass The 'value' type. 306 * @param compressionType The compression type. 307 * @return Returns the handle to the constructed SequenceFile Writer. 308 * @throws IOException 309 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 310 * instead. 311 */ 312 @Deprecated 313 public static Writer 314 createWriter(FileSystem fs, Configuration conf, Path name, 315 Class keyClass, Class valClass, 316 CompressionType compressionType) throws IOException { 317 return createWriter(conf, Writer.filesystem(fs), 318 Writer.file(name), Writer.keyClass(keyClass), 319 Writer.valueClass(valClass), 320 Writer.compression(compressionType)); 321 } 322 323 /** 324 * Construct the preferred type of SequenceFile Writer. 325 * @param fs The configured filesystem. 326 * @param conf The configuration. 327 * @param name The name of the file. 328 * @param keyClass The 'key' type. 329 * @param valClass The 'value' type. 330 * @param compressionType The compression type. 331 * @param progress The Progressable object to track progress. 332 * @return Returns the handle to the constructed SequenceFile Writer. 333 * @throws IOException 334 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 335 * instead. 336 */ 337 @Deprecated 338 public static Writer 339 createWriter(FileSystem fs, Configuration conf, Path name, 340 Class keyClass, Class valClass, CompressionType compressionType, 341 Progressable progress) throws IOException { 342 return createWriter(conf, Writer.file(name), 343 Writer.filesystem(fs), 344 Writer.keyClass(keyClass), 345 Writer.valueClass(valClass), 346 Writer.compression(compressionType), 347 Writer.progressable(progress)); 348 } 349 350 /** 351 * Construct the preferred type of SequenceFile Writer. 352 * @param fs The configured filesystem. 353 * @param conf The configuration. 354 * @param name The name of the file. 355 * @param keyClass The 'key' type. 356 * @param valClass The 'value' type. 357 * @param compressionType The compression type. 358 * @param codec The compression codec. 359 * @return Returns the handle to the constructed SequenceFile Writer. 360 * @throws IOException 361 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 362 * instead. 363 */ 364 @Deprecated 365 public static Writer 366 createWriter(FileSystem fs, Configuration conf, Path name, 367 Class keyClass, Class valClass, CompressionType compressionType, 368 CompressionCodec codec) throws IOException { 369 return createWriter(conf, Writer.file(name), 370 Writer.filesystem(fs), 371 Writer.keyClass(keyClass), 372 Writer.valueClass(valClass), 373 Writer.compression(compressionType, codec)); 374 } 375 376 /** 377 * Construct the preferred type of SequenceFile Writer. 378 * @param fs The configured filesystem. 379 * @param conf The configuration. 380 * @param name The name of the file. 381 * @param keyClass The 'key' type. 382 * @param valClass The 'value' type. 383 * @param compressionType The compression type. 384 * @param codec The compression codec. 385 * @param progress The Progressable object to track progress. 386 * @param metadata The metadata of the file. 387 * @return Returns the handle to the constructed SequenceFile Writer. 388 * @throws IOException 389 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 390 * instead. 391 */ 392 @Deprecated 393 public static Writer 394 createWriter(FileSystem fs, Configuration conf, Path name, 395 Class keyClass, Class valClass, 396 CompressionType compressionType, CompressionCodec codec, 397 Progressable progress, Metadata metadata) throws IOException { 398 return createWriter(conf, Writer.file(name), 399 Writer.filesystem(fs), 400 Writer.keyClass(keyClass), 401 Writer.valueClass(valClass), 402 Writer.compression(compressionType, codec), 403 Writer.progressable(progress), 404 Writer.metadata(metadata)); 405 } 406 407 /** 408 * Construct the preferred type of SequenceFile Writer. 409 * @param fs The configured filesystem. 410 * @param conf The configuration. 411 * @param name The name of the file. 412 * @param keyClass The 'key' type. 413 * @param valClass The 'value' type. 414 * @param bufferSize buffer size for the underlaying outputstream. 415 * @param replication replication factor for the file. 416 * @param blockSize block size for the file. 417 * @param compressionType The compression type. 418 * @param codec The compression codec. 419 * @param progress The Progressable object to track progress. 420 * @param metadata The metadata of the file. 421 * @return Returns the handle to the constructed SequenceFile Writer. 422 * @throws IOException 423 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 424 * instead. 425 */ 426 @Deprecated 427 public static Writer 428 createWriter(FileSystem fs, Configuration conf, Path name, 429 Class keyClass, Class valClass, int bufferSize, 430 short replication, long blockSize, 431 CompressionType compressionType, CompressionCodec codec, 432 Progressable progress, Metadata metadata) throws IOException { 433 return createWriter(conf, Writer.file(name), 434 Writer.filesystem(fs), 435 Writer.keyClass(keyClass), 436 Writer.valueClass(valClass), 437 Writer.bufferSize(bufferSize), 438 Writer.replication(replication), 439 Writer.blockSize(blockSize), 440 Writer.compression(compressionType, codec), 441 Writer.progressable(progress), 442 Writer.metadata(metadata)); 443 } 444 445 /** 446 * Construct the preferred type of SequenceFile Writer. 447 * @param fs The configured filesystem. 448 * @param conf The configuration. 449 * @param name The name of the file. 450 * @param keyClass The 'key' type. 451 * @param valClass The 'value' type. 452 * @param bufferSize buffer size for the underlaying outputstream. 453 * @param replication replication factor for the file. 454 * @param blockSize block size for the file. 455 * @param createParent create parent directory if non-existent 456 * @param compressionType The compression type. 457 * @param codec The compression codec. 458 * @param metadata The metadata of the file. 459 * @return Returns the handle to the constructed SequenceFile Writer. 460 * @throws IOException 461 */ 462 @Deprecated 463 public static Writer 464 createWriter(FileSystem fs, Configuration conf, Path name, 465 Class keyClass, Class valClass, int bufferSize, 466 short replication, long blockSize, boolean createParent, 467 CompressionType compressionType, CompressionCodec codec, 468 Metadata metadata) throws IOException { 469 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 470 conf, name, keyClass, valClass, compressionType, codec, 471 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 472 CreateOpts.bufferSize(bufferSize), 473 createParent ? CreateOpts.createParent() 474 : CreateOpts.donotCreateParent(), 475 CreateOpts.repFac(replication), 476 CreateOpts.blockSize(blockSize) 477 ); 478 } 479 480 /** 481 * Construct the preferred type of SequenceFile Writer. 482 * @param fc The context for the specified file. 483 * @param conf The configuration. 484 * @param name The name of the file. 485 * @param keyClass The 'key' type. 486 * @param valClass The 'value' type. 487 * @param compressionType The compression type. 488 * @param codec The compression codec. 489 * @param metadata The metadata of the file. 490 * @param createFlag gives the semantics of create: overwrite, append etc. 491 * @param opts file creation options; see {@link CreateOpts}. 492 * @return Returns the handle to the constructed SequenceFile Writer. 493 * @throws IOException 494 */ 495 public static Writer 496 createWriter(FileContext fc, Configuration conf, Path name, 497 Class keyClass, Class valClass, 498 CompressionType compressionType, CompressionCodec codec, 499 Metadata metadata, 500 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 501 throws IOException { 502 return createWriter(conf, fc.create(name, createFlag, opts), 503 keyClass, valClass, compressionType, codec, metadata).ownStream(); 504 } 505 506 /** 507 * Construct the preferred type of SequenceFile Writer. 508 * @param fs The configured filesystem. 509 * @param conf The configuration. 510 * @param name The name of the file. 511 * @param keyClass The 'key' type. 512 * @param valClass The 'value' type. 513 * @param compressionType The compression type. 514 * @param codec The compression codec. 515 * @param progress The Progressable object to track progress. 516 * @return Returns the handle to the constructed SequenceFile Writer. 517 * @throws IOException 518 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 519 * instead. 520 */ 521 @Deprecated 522 public static Writer 523 createWriter(FileSystem fs, Configuration conf, Path name, 524 Class keyClass, Class valClass, 525 CompressionType compressionType, CompressionCodec codec, 526 Progressable progress) throws IOException { 527 return createWriter(conf, Writer.file(name), 528 Writer.filesystem(fs), 529 Writer.keyClass(keyClass), 530 Writer.valueClass(valClass), 531 Writer.compression(compressionType, codec), 532 Writer.progressable(progress)); 533 } 534 535 /** 536 * Construct the preferred type of 'raw' SequenceFile Writer. 537 * @param conf The configuration. 538 * @param out The stream on top which the writer is to be constructed. 539 * @param keyClass The 'key' type. 540 * @param valClass The 'value' type. 541 * @param compressionType The compression type. 542 * @param codec The compression codec. 543 * @param metadata The metadata of the file. 544 * @return Returns the handle to the constructed SequenceFile Writer. 545 * @throws IOException 546 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 547 * instead. 548 */ 549 @Deprecated 550 public static Writer 551 createWriter(Configuration conf, FSDataOutputStream out, 552 Class keyClass, Class valClass, 553 CompressionType compressionType, 554 CompressionCodec codec, Metadata metadata) throws IOException { 555 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 556 Writer.valueClass(valClass), 557 Writer.compression(compressionType, codec), 558 Writer.metadata(metadata)); 559 } 560 561 /** 562 * Construct the preferred type of 'raw' SequenceFile Writer. 563 * @param conf The configuration. 564 * @param out The stream on top which the writer is to be constructed. 565 * @param keyClass The 'key' type. 566 * @param valClass The 'value' type. 567 * @param compressionType The compression type. 568 * @param codec The compression codec. 569 * @return Returns the handle to the constructed SequenceFile Writer. 570 * @throws IOException 571 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 572 * instead. 573 */ 574 @Deprecated 575 public static Writer 576 createWriter(Configuration conf, FSDataOutputStream out, 577 Class keyClass, Class valClass, CompressionType compressionType, 578 CompressionCodec codec) throws IOException { 579 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 580 Writer.valueClass(valClass), 581 Writer.compression(compressionType, codec)); 582 } 583 584 585 /** The interface to 'raw' values of SequenceFiles. */ 586 public static interface ValueBytes { 587 588 /** Writes the uncompressed bytes to the outStream. 589 * @param outStream : Stream to write uncompressed bytes into. 590 * @throws IOException 591 */ 592 public void writeUncompressedBytes(DataOutputStream outStream) 593 throws IOException; 594 595 /** Write compressed bytes to outStream. 596 * Note: that it will NOT compress the bytes if they are not compressed. 597 * @param outStream : Stream to write compressed bytes into. 598 */ 599 public void writeCompressedBytes(DataOutputStream outStream) 600 throws IllegalArgumentException, IOException; 601 602 /** 603 * Size of stored data. 604 */ 605 public int getSize(); 606 } 607 608 private static class UncompressedBytes implements ValueBytes { 609 private int dataSize; 610 private byte[] data; 611 612 private UncompressedBytes() { 613 data = null; 614 dataSize = 0; 615 } 616 617 private void reset(DataInputStream in, int length) throws IOException { 618 if (data == null) { 619 data = new byte[length]; 620 } else if (length > data.length) { 621 data = new byte[Math.max(length, data.length * 2)]; 622 } 623 dataSize = -1; 624 in.readFully(data, 0, length); 625 dataSize = length; 626 } 627 628 @Override 629 public int getSize() { 630 return dataSize; 631 } 632 633 @Override 634 public void writeUncompressedBytes(DataOutputStream outStream) 635 throws IOException { 636 outStream.write(data, 0, dataSize); 637 } 638 639 @Override 640 public void writeCompressedBytes(DataOutputStream outStream) 641 throws IllegalArgumentException, IOException { 642 throw 643 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 644 } 645 646 } // UncompressedBytes 647 648 private static class CompressedBytes implements ValueBytes { 649 private int dataSize; 650 private byte[] data; 651 DataInputBuffer rawData = null; 652 CompressionCodec codec = null; 653 CompressionInputStream decompressedStream = null; 654 655 private CompressedBytes(CompressionCodec codec) { 656 data = null; 657 dataSize = 0; 658 this.codec = codec; 659 } 660 661 private void reset(DataInputStream in, int length) throws IOException { 662 if (data == null) { 663 data = new byte[length]; 664 } else if (length > data.length) { 665 data = new byte[Math.max(length, data.length * 2)]; 666 } 667 dataSize = -1; 668 in.readFully(data, 0, length); 669 dataSize = length; 670 } 671 672 @Override 673 public int getSize() { 674 return dataSize; 675 } 676 677 @Override 678 public void writeUncompressedBytes(DataOutputStream outStream) 679 throws IOException { 680 if (decompressedStream == null) { 681 rawData = new DataInputBuffer(); 682 decompressedStream = codec.createInputStream(rawData); 683 } else { 684 decompressedStream.resetState(); 685 } 686 rawData.reset(data, 0, dataSize); 687 688 byte[] buffer = new byte[8192]; 689 int bytesRead = 0; 690 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 691 outStream.write(buffer, 0, bytesRead); 692 } 693 } 694 695 @Override 696 public void writeCompressedBytes(DataOutputStream outStream) 697 throws IllegalArgumentException, IOException { 698 outStream.write(data, 0, dataSize); 699 } 700 701 } // CompressedBytes 702 703 /** 704 * The class encapsulating with the metadata of a file. 705 * The metadata of a file is a list of attribute name/value 706 * pairs of Text type. 707 * 708 */ 709 public static class Metadata implements Writable { 710 711 private TreeMap<Text, Text> theMetadata; 712 713 public Metadata() { 714 this(new TreeMap<Text, Text>()); 715 } 716 717 public Metadata(TreeMap<Text, Text> arg) { 718 if (arg == null) { 719 this.theMetadata = new TreeMap<Text, Text>(); 720 } else { 721 this.theMetadata = arg; 722 } 723 } 724 725 public Text get(Text name) { 726 return this.theMetadata.get(name); 727 } 728 729 public void set(Text name, Text value) { 730 this.theMetadata.put(name, value); 731 } 732 733 public TreeMap<Text, Text> getMetadata() { 734 return new TreeMap<Text, Text>(this.theMetadata); 735 } 736 737 @Override 738 public void write(DataOutput out) throws IOException { 739 out.writeInt(this.theMetadata.size()); 740 Iterator<Map.Entry<Text, Text>> iter = 741 this.theMetadata.entrySet().iterator(); 742 while (iter.hasNext()) { 743 Map.Entry<Text, Text> en = iter.next(); 744 en.getKey().write(out); 745 en.getValue().write(out); 746 } 747 } 748 749 @Override 750 public void readFields(DataInput in) throws IOException { 751 int sz = in.readInt(); 752 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 753 this.theMetadata = new TreeMap<Text, Text>(); 754 for (int i = 0; i < sz; i++) { 755 Text key = new Text(); 756 Text val = new Text(); 757 key.readFields(in); 758 val.readFields(in); 759 this.theMetadata.put(key, val); 760 } 761 } 762 763 @Override 764 public boolean equals(Object other) { 765 if (other == null) { 766 return false; 767 } 768 if (other.getClass() != this.getClass()) { 769 return false; 770 } else { 771 return equals((Metadata)other); 772 } 773 } 774 775 public boolean equals(Metadata other) { 776 if (other == null) return false; 777 if (this.theMetadata.size() != other.theMetadata.size()) { 778 return false; 779 } 780 Iterator<Map.Entry<Text, Text>> iter1 = 781 this.theMetadata.entrySet().iterator(); 782 Iterator<Map.Entry<Text, Text>> iter2 = 783 other.theMetadata.entrySet().iterator(); 784 while (iter1.hasNext() && iter2.hasNext()) { 785 Map.Entry<Text, Text> en1 = iter1.next(); 786 Map.Entry<Text, Text> en2 = iter2.next(); 787 if (!en1.getKey().equals(en2.getKey())) { 788 return false; 789 } 790 if (!en1.getValue().equals(en2.getValue())) { 791 return false; 792 } 793 } 794 if (iter1.hasNext() || iter2.hasNext()) { 795 return false; 796 } 797 return true; 798 } 799 800 @Override 801 public int hashCode() { 802 assert false : "hashCode not designed"; 803 return 42; // any arbitrary constant will do 804 } 805 806 @Override 807 public String toString() { 808 StringBuilder sb = new StringBuilder(); 809 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 810 Iterator<Map.Entry<Text, Text>> iter = 811 this.theMetadata.entrySet().iterator(); 812 while (iter.hasNext()) { 813 Map.Entry<Text, Text> en = iter.next(); 814 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 815 sb.append("\n"); 816 } 817 return sb.toString(); 818 } 819 } 820 821 /** Write key/value pairs to a sequence-format file. */ 822 public static class Writer implements java.io.Closeable, Syncable { 823 private Configuration conf; 824 FSDataOutputStream out; 825 boolean ownOutputStream = true; 826 DataOutputBuffer buffer = new DataOutputBuffer(); 827 828 Class keyClass; 829 Class valClass; 830 831 private final CompressionType compress; 832 CompressionCodec codec = null; 833 CompressionOutputStream deflateFilter = null; 834 DataOutputStream deflateOut = null; 835 Metadata metadata = null; 836 Compressor compressor = null; 837 838 protected Serializer keySerializer; 839 protected Serializer uncompressedValSerializer; 840 protected Serializer compressedValSerializer; 841 842 // Insert a globally unique 16-byte value every few entries, so that one 843 // can seek into the middle of a file and then synchronize with record 844 // starts and ends by scanning for this value. 845 long lastSyncPos; // position of last sync 846 byte[] sync; // 16 random bytes 847 { 848 try { 849 MessageDigest digester = MessageDigest.getInstance("MD5"); 850 long time = Time.now(); 851 digester.update((new UID()+"@"+time).getBytes()); 852 sync = digester.digest(); 853 } catch (Exception e) { 854 throw new RuntimeException(e); 855 } 856 } 857 858 public static interface Option {} 859 860 static class FileOption extends Options.PathOption 861 implements Option { 862 FileOption(Path path) { 863 super(path); 864 } 865 } 866 867 /** 868 * @deprecated only used for backwards-compatibility in the createWriter methods 869 * that take FileSystem. 870 */ 871 @Deprecated 872 private static class FileSystemOption implements Option { 873 private final FileSystem value; 874 protected FileSystemOption(FileSystem value) { 875 this.value = value; 876 } 877 public FileSystem getValue() { 878 return value; 879 } 880 } 881 882 static class StreamOption extends Options.FSDataOutputStreamOption 883 implements Option { 884 StreamOption(FSDataOutputStream stream) { 885 super(stream); 886 } 887 } 888 889 static class BufferSizeOption extends Options.IntegerOption 890 implements Option { 891 BufferSizeOption(int value) { 892 super(value); 893 } 894 } 895 896 static class BlockSizeOption extends Options.LongOption implements Option { 897 BlockSizeOption(long value) { 898 super(value); 899 } 900 } 901 902 static class ReplicationOption extends Options.IntegerOption 903 implements Option { 904 ReplicationOption(int value) { 905 super(value); 906 } 907 } 908 909 static class KeyClassOption extends Options.ClassOption implements Option { 910 KeyClassOption(Class<?> value) { 911 super(value); 912 } 913 } 914 915 static class ValueClassOption extends Options.ClassOption 916 implements Option { 917 ValueClassOption(Class<?> value) { 918 super(value); 919 } 920 } 921 922 static class MetadataOption implements Option { 923 private final Metadata value; 924 MetadataOption(Metadata value) { 925 this.value = value; 926 } 927 Metadata getValue() { 928 return value; 929 } 930 } 931 932 static class ProgressableOption extends Options.ProgressableOption 933 implements Option { 934 ProgressableOption(Progressable value) { 935 super(value); 936 } 937 } 938 939 private static class CompressionOption implements Option { 940 private final CompressionType value; 941 private final CompressionCodec codec; 942 CompressionOption(CompressionType value) { 943 this(value, null); 944 } 945 CompressionOption(CompressionType value, CompressionCodec codec) { 946 this.value = value; 947 this.codec = (CompressionType.NONE != value && null == codec) 948 ? new DefaultCodec() 949 : codec; 950 } 951 CompressionType getValue() { 952 return value; 953 } 954 CompressionCodec getCodec() { 955 return codec; 956 } 957 } 958 959 public static Option file(Path value) { 960 return new FileOption(value); 961 } 962 963 /** 964 * @deprecated only used for backwards-compatibility in the createWriter methods 965 * that take FileSystem. 966 */ 967 @Deprecated 968 private static Option filesystem(FileSystem fs) { 969 return new SequenceFile.Writer.FileSystemOption(fs); 970 } 971 972 public static Option bufferSize(int value) { 973 return new BufferSizeOption(value); 974 } 975 976 public static Option stream(FSDataOutputStream value) { 977 return new StreamOption(value); 978 } 979 980 public static Option replication(short value) { 981 return new ReplicationOption(value); 982 } 983 984 public static Option blockSize(long value) { 985 return new BlockSizeOption(value); 986 } 987 988 public static Option progressable(Progressable value) { 989 return new ProgressableOption(value); 990 } 991 992 public static Option keyClass(Class<?> value) { 993 return new KeyClassOption(value); 994 } 995 996 public static Option valueClass(Class<?> value) { 997 return new ValueClassOption(value); 998 } 999 1000 public static Option metadata(Metadata value) { 1001 return new MetadataOption(value); 1002 } 1003 1004 public static Option compression(CompressionType value) { 1005 return new CompressionOption(value); 1006 } 1007 1008 public static Option compression(CompressionType value, 1009 CompressionCodec codec) { 1010 return new CompressionOption(value, codec); 1011 } 1012 1013 /** 1014 * Construct a uncompressed writer from a set of options. 1015 * @param conf the configuration to use 1016 * @param options the options used when creating the writer 1017 * @throws IOException if it fails 1018 */ 1019 Writer(Configuration conf, 1020 Option... opts) throws IOException { 1021 BlockSizeOption blockSizeOption = 1022 Options.getOption(BlockSizeOption.class, opts); 1023 BufferSizeOption bufferSizeOption = 1024 Options.getOption(BufferSizeOption.class, opts); 1025 ReplicationOption replicationOption = 1026 Options.getOption(ReplicationOption.class, opts); 1027 ProgressableOption progressOption = 1028 Options.getOption(ProgressableOption.class, opts); 1029 FileOption fileOption = Options.getOption(FileOption.class, opts); 1030 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1031 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1032 KeyClassOption keyClassOption = 1033 Options.getOption(KeyClassOption.class, opts); 1034 ValueClassOption valueClassOption = 1035 Options.getOption(ValueClassOption.class, opts); 1036 MetadataOption metadataOption = 1037 Options.getOption(MetadataOption.class, opts); 1038 CompressionOption compressionTypeOption = 1039 Options.getOption(CompressionOption.class, opts); 1040 // check consistency of options 1041 if ((fileOption == null) == (streamOption == null)) { 1042 throw new IllegalArgumentException("file or stream must be specified"); 1043 } 1044 if (fileOption == null && (blockSizeOption != null || 1045 bufferSizeOption != null || 1046 replicationOption != null || 1047 progressOption != null)) { 1048 throw new IllegalArgumentException("file modifier options not " + 1049 "compatible with stream"); 1050 } 1051 1052 FSDataOutputStream out; 1053 boolean ownStream = fileOption != null; 1054 if (ownStream) { 1055 Path p = fileOption.getValue(); 1056 FileSystem fs; 1057 if (fsOption != null) { 1058 fs = fsOption.getValue(); 1059 } else { 1060 fs = p.getFileSystem(conf); 1061 } 1062 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1063 bufferSizeOption.getValue(); 1064 short replication = replicationOption == null ? 1065 fs.getDefaultReplication(p) : 1066 (short) replicationOption.getValue(); 1067 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1068 blockSizeOption.getValue(); 1069 Progressable progress = progressOption == null ? null : 1070 progressOption.getValue(); 1071 out = fs.create(p, true, bufferSize, replication, blockSize, progress); 1072 } else { 1073 out = streamOption.getValue(); 1074 } 1075 Class<?> keyClass = keyClassOption == null ? 1076 Object.class : keyClassOption.getValue(); 1077 Class<?> valueClass = valueClassOption == null ? 1078 Object.class : valueClassOption.getValue(); 1079 Metadata metadata = metadataOption == null ? 1080 new Metadata() : metadataOption.getValue(); 1081 this.compress = compressionTypeOption.getValue(); 1082 final CompressionCodec codec = compressionTypeOption.getCodec(); 1083 if (codec != null && 1084 (codec instanceof GzipCodec) && 1085 !NativeCodeLoader.isNativeCodeLoaded() && 1086 !ZlibFactory.isNativeZlibLoaded(conf)) { 1087 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1088 "GzipCodec without native-hadoop " + 1089 "code!"); 1090 } 1091 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1092 } 1093 1094 /** Create the named file. 1095 * @deprecated Use 1096 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1097 * instead. 1098 */ 1099 @Deprecated 1100 public Writer(FileSystem fs, Configuration conf, Path name, 1101 Class keyClass, Class valClass) throws IOException { 1102 this.compress = CompressionType.NONE; 1103 init(conf, fs.create(name), true, keyClass, valClass, null, 1104 new Metadata()); 1105 } 1106 1107 /** Create the named file with write-progress reporter. 1108 * @deprecated Use 1109 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1110 * instead. 1111 */ 1112 @Deprecated 1113 public Writer(FileSystem fs, Configuration conf, Path name, 1114 Class keyClass, Class valClass, 1115 Progressable progress, Metadata metadata) throws IOException { 1116 this.compress = CompressionType.NONE; 1117 init(conf, fs.create(name, progress), true, keyClass, valClass, 1118 null, metadata); 1119 } 1120 1121 /** Create the named file with write-progress reporter. 1122 * @deprecated Use 1123 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1124 * instead. 1125 */ 1126 @Deprecated 1127 public Writer(FileSystem fs, Configuration conf, Path name, 1128 Class keyClass, Class valClass, 1129 int bufferSize, short replication, long blockSize, 1130 Progressable progress, Metadata metadata) throws IOException { 1131 this.compress = CompressionType.NONE; 1132 init(conf, 1133 fs.create(name, true, bufferSize, replication, blockSize, progress), 1134 true, keyClass, valClass, null, metadata); 1135 } 1136 1137 boolean isCompressed() { return compress != CompressionType.NONE; } 1138 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1139 1140 Writer ownStream() { this.ownOutputStream = true; return this; } 1141 1142 /** Write and flush the file header. */ 1143 private void writeFileHeader() 1144 throws IOException { 1145 out.write(VERSION); 1146 Text.writeString(out, keyClass.getName()); 1147 Text.writeString(out, valClass.getName()); 1148 1149 out.writeBoolean(this.isCompressed()); 1150 out.writeBoolean(this.isBlockCompressed()); 1151 1152 if (this.isCompressed()) { 1153 Text.writeString(out, (codec.getClass()).getName()); 1154 } 1155 this.metadata.write(out); 1156 out.write(sync); // write the sync bytes 1157 out.flush(); // flush header 1158 } 1159 1160 /** Initialize. */ 1161 @SuppressWarnings("unchecked") 1162 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1163 Class keyClass, Class valClass, 1164 CompressionCodec codec, Metadata metadata) 1165 throws IOException { 1166 this.conf = conf; 1167 this.out = out; 1168 this.ownOutputStream = ownStream; 1169 this.keyClass = keyClass; 1170 this.valClass = valClass; 1171 this.codec = codec; 1172 this.metadata = metadata; 1173 SerializationFactory serializationFactory = new SerializationFactory(conf); 1174 this.keySerializer = serializationFactory.getSerializer(keyClass); 1175 if (this.keySerializer == null) { 1176 throw new IOException( 1177 "Could not find a serializer for the Key class: '" 1178 + keyClass.getCanonicalName() + "'. " 1179 + "Please ensure that the configuration '" + 1180 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1181 + "properly configured, if you're using" 1182 + "custom serialization."); 1183 } 1184 this.keySerializer.open(buffer); 1185 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1186 if (this.uncompressedValSerializer == null) { 1187 throw new IOException( 1188 "Could not find a serializer for the Value class: '" 1189 + valClass.getCanonicalName() + "'. " 1190 + "Please ensure that the configuration '" + 1191 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1192 + "properly configured, if you're using" 1193 + "custom serialization."); 1194 } 1195 this.uncompressedValSerializer.open(buffer); 1196 if (this.codec != null) { 1197 ReflectionUtils.setConf(this.codec, this.conf); 1198 this.compressor = CodecPool.getCompressor(this.codec); 1199 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1200 this.deflateOut = 1201 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1202 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1203 if (this.compressedValSerializer == null) { 1204 throw new IOException( 1205 "Could not find a serializer for the Value class: '" 1206 + valClass.getCanonicalName() + "'. " 1207 + "Please ensure that the configuration '" + 1208 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1209 + "properly configured, if you're using" 1210 + "custom serialization."); 1211 } 1212 this.compressedValSerializer.open(deflateOut); 1213 } 1214 writeFileHeader(); 1215 } 1216 1217 /** Returns the class of keys in this file. */ 1218 public Class getKeyClass() { return keyClass; } 1219 1220 /** Returns the class of values in this file. */ 1221 public Class getValueClass() { return valClass; } 1222 1223 /** Returns the compression codec of data in this file. */ 1224 public CompressionCodec getCompressionCodec() { return codec; } 1225 1226 /** create a sync point */ 1227 public void sync() throws IOException { 1228 if (sync != null && lastSyncPos != out.getPos()) { 1229 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1230 out.write(sync); // write sync 1231 lastSyncPos = out.getPos(); // update lastSyncPos 1232 } 1233 } 1234 1235 /** 1236 * flush all currently written data to the file system 1237 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1238 */ 1239 @Deprecated 1240 public void syncFs() throws IOException { 1241 if (out != null) { 1242 out.sync(); // flush contents to file system 1243 } 1244 } 1245 1246 @Override 1247 public void hsync() throws IOException { 1248 if (out != null) { 1249 out.hsync(); 1250 } 1251 } 1252 1253 @Override 1254 public void hflush() throws IOException { 1255 if (out != null) { 1256 out.hflush(); 1257 } 1258 } 1259 1260 /** Returns the configuration of this file. */ 1261 Configuration getConf() { return conf; } 1262 1263 /** Close the file. */ 1264 @Override 1265 public synchronized void close() throws IOException { 1266 keySerializer.close(); 1267 uncompressedValSerializer.close(); 1268 if (compressedValSerializer != null) { 1269 compressedValSerializer.close(); 1270 } 1271 1272 CodecPool.returnCompressor(compressor); 1273 compressor = null; 1274 1275 if (out != null) { 1276 1277 // Close the underlying stream iff we own it... 1278 if (ownOutputStream) { 1279 out.close(); 1280 } else { 1281 out.flush(); 1282 } 1283 out = null; 1284 } 1285 } 1286 1287 synchronized void checkAndWriteSync() throws IOException { 1288 if (sync != null && 1289 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1290 sync(); 1291 } 1292 } 1293 1294 /** Append a key/value pair. */ 1295 public void append(Writable key, Writable val) 1296 throws IOException { 1297 append((Object) key, (Object) val); 1298 } 1299 1300 /** Append a key/value pair. */ 1301 @SuppressWarnings("unchecked") 1302 public synchronized void append(Object key, Object val) 1303 throws IOException { 1304 if (key.getClass() != keyClass) 1305 throw new IOException("wrong key class: "+key.getClass().getName() 1306 +" is not "+keyClass); 1307 if (val.getClass() != valClass) 1308 throw new IOException("wrong value class: "+val.getClass().getName() 1309 +" is not "+valClass); 1310 1311 buffer.reset(); 1312 1313 // Append the 'key' 1314 keySerializer.serialize(key); 1315 int keyLength = buffer.getLength(); 1316 if (keyLength < 0) 1317 throw new IOException("negative length keys not allowed: " + key); 1318 1319 // Append the 'value' 1320 if (compress == CompressionType.RECORD) { 1321 deflateFilter.resetState(); 1322 compressedValSerializer.serialize(val); 1323 deflateOut.flush(); 1324 deflateFilter.finish(); 1325 } else { 1326 uncompressedValSerializer.serialize(val); 1327 } 1328 1329 // Write the record out 1330 checkAndWriteSync(); // sync 1331 out.writeInt(buffer.getLength()); // total record length 1332 out.writeInt(keyLength); // key portion length 1333 out.write(buffer.getData(), 0, buffer.getLength()); // data 1334 } 1335 1336 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1337 int keyLength, ValueBytes val) throws IOException { 1338 if (keyLength < 0) 1339 throw new IOException("negative length keys not allowed: " + keyLength); 1340 1341 int valLength = val.getSize(); 1342 1343 checkAndWriteSync(); 1344 1345 out.writeInt(keyLength+valLength); // total record length 1346 out.writeInt(keyLength); // key portion length 1347 out.write(keyData, keyOffset, keyLength); // key 1348 val.writeUncompressedBytes(out); // value 1349 } 1350 1351 /** Returns the current length of the output file. 1352 * 1353 * <p>This always returns a synchronized position. In other words, 1354 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1355 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1356 * the key may be earlier in the file than key last written when this 1357 * method was called (e.g., with block-compression, it may be the first key 1358 * in the block that was being written when this method was called). 1359 */ 1360 public synchronized long getLength() throws IOException { 1361 return out.getPos(); 1362 } 1363 1364 } // class Writer 1365 1366 /** Write key/compressed-value pairs to a sequence-format file. */ 1367 static class RecordCompressWriter extends Writer { 1368 1369 RecordCompressWriter(Configuration conf, 1370 Option... options) throws IOException { 1371 super(conf, options); 1372 } 1373 1374 /** Append a key/value pair. */ 1375 @Override 1376 @SuppressWarnings("unchecked") 1377 public synchronized void append(Object key, Object val) 1378 throws IOException { 1379 if (key.getClass() != keyClass) 1380 throw new IOException("wrong key class: "+key.getClass().getName() 1381 +" is not "+keyClass); 1382 if (val.getClass() != valClass) 1383 throw new IOException("wrong value class: "+val.getClass().getName() 1384 +" is not "+valClass); 1385 1386 buffer.reset(); 1387 1388 // Append the 'key' 1389 keySerializer.serialize(key); 1390 int keyLength = buffer.getLength(); 1391 if (keyLength < 0) 1392 throw new IOException("negative length keys not allowed: " + key); 1393 1394 // Compress 'value' and append it 1395 deflateFilter.resetState(); 1396 compressedValSerializer.serialize(val); 1397 deflateOut.flush(); 1398 deflateFilter.finish(); 1399 1400 // Write the record out 1401 checkAndWriteSync(); // sync 1402 out.writeInt(buffer.getLength()); // total record length 1403 out.writeInt(keyLength); // key portion length 1404 out.write(buffer.getData(), 0, buffer.getLength()); // data 1405 } 1406 1407 /** Append a key/value pair. */ 1408 @Override 1409 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1410 int keyLength, ValueBytes val) throws IOException { 1411 1412 if (keyLength < 0) 1413 throw new IOException("negative length keys not allowed: " + keyLength); 1414 1415 int valLength = val.getSize(); 1416 1417 checkAndWriteSync(); // sync 1418 out.writeInt(keyLength+valLength); // total record length 1419 out.writeInt(keyLength); // key portion length 1420 out.write(keyData, keyOffset, keyLength); // 'key' data 1421 val.writeCompressedBytes(out); // 'value' data 1422 } 1423 1424 } // RecordCompressionWriter 1425 1426 /** Write compressed key/value blocks to a sequence-format file. */ 1427 static class BlockCompressWriter extends Writer { 1428 1429 private int noBufferedRecords = 0; 1430 1431 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1432 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1433 1434 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1435 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1436 1437 private final int compressionBlockSize; 1438 1439 BlockCompressWriter(Configuration conf, 1440 Option... options) throws IOException { 1441 super(conf, options); 1442 compressionBlockSize = 1443 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1444 keySerializer.close(); 1445 keySerializer.open(keyBuffer); 1446 uncompressedValSerializer.close(); 1447 uncompressedValSerializer.open(valBuffer); 1448 } 1449 1450 /** Workhorse to check and write out compressed data/lengths */ 1451 private synchronized 1452 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1453 throws IOException { 1454 deflateFilter.resetState(); 1455 buffer.reset(); 1456 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1457 uncompressedDataBuffer.getLength()); 1458 deflateOut.flush(); 1459 deflateFilter.finish(); 1460 1461 WritableUtils.writeVInt(out, buffer.getLength()); 1462 out.write(buffer.getData(), 0, buffer.getLength()); 1463 } 1464 1465 /** Compress and flush contents to dfs */ 1466 @Override 1467 public synchronized void sync() throws IOException { 1468 if (noBufferedRecords > 0) { 1469 super.sync(); 1470 1471 // No. of records 1472 WritableUtils.writeVInt(out, noBufferedRecords); 1473 1474 // Write 'keys' and lengths 1475 writeBuffer(keyLenBuffer); 1476 writeBuffer(keyBuffer); 1477 1478 // Write 'values' and lengths 1479 writeBuffer(valLenBuffer); 1480 writeBuffer(valBuffer); 1481 1482 // Flush the file-stream 1483 out.flush(); 1484 1485 // Reset internal states 1486 keyLenBuffer.reset(); 1487 keyBuffer.reset(); 1488 valLenBuffer.reset(); 1489 valBuffer.reset(); 1490 noBufferedRecords = 0; 1491 } 1492 1493 } 1494 1495 /** Close the file. */ 1496 @Override 1497 public synchronized void close() throws IOException { 1498 if (out != null) { 1499 sync(); 1500 } 1501 super.close(); 1502 } 1503 1504 /** Append a key/value pair. */ 1505 @Override 1506 @SuppressWarnings("unchecked") 1507 public synchronized void append(Object key, Object val) 1508 throws IOException { 1509 if (key.getClass() != keyClass) 1510 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1511 if (val.getClass() != valClass) 1512 throw new IOException("wrong value class: "+val+" is not "+valClass); 1513 1514 // Save key/value into respective buffers 1515 int oldKeyLength = keyBuffer.getLength(); 1516 keySerializer.serialize(key); 1517 int keyLength = keyBuffer.getLength() - oldKeyLength; 1518 if (keyLength < 0) 1519 throw new IOException("negative length keys not allowed: " + key); 1520 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1521 1522 int oldValLength = valBuffer.getLength(); 1523 uncompressedValSerializer.serialize(val); 1524 int valLength = valBuffer.getLength() - oldValLength; 1525 WritableUtils.writeVInt(valLenBuffer, valLength); 1526 1527 // Added another key/value pair 1528 ++noBufferedRecords; 1529 1530 // Compress and flush? 1531 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1532 if (currentBlockSize >= compressionBlockSize) { 1533 sync(); 1534 } 1535 } 1536 1537 /** Append a key/value pair. */ 1538 @Override 1539 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1540 int keyLength, ValueBytes val) throws IOException { 1541 1542 if (keyLength < 0) 1543 throw new IOException("negative length keys not allowed"); 1544 1545 int valLength = val.getSize(); 1546 1547 // Save key/value data in relevant buffers 1548 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1549 keyBuffer.write(keyData, keyOffset, keyLength); 1550 WritableUtils.writeVInt(valLenBuffer, valLength); 1551 val.writeUncompressedBytes(valBuffer); 1552 1553 // Added another key/value pair 1554 ++noBufferedRecords; 1555 1556 // Compress and flush? 1557 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1558 if (currentBlockSize >= compressionBlockSize) { 1559 sync(); 1560 } 1561 } 1562 1563 } // BlockCompressionWriter 1564 1565 /** Get the configured buffer size */ 1566 private static int getBufferSize(Configuration conf) { 1567 return conf.getInt("io.file.buffer.size", 4096); 1568 } 1569 1570 /** Reads key/value pairs from a sequence-format file. */ 1571 public static class Reader implements java.io.Closeable { 1572 private String filename; 1573 private FSDataInputStream in; 1574 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1575 1576 private byte version; 1577 1578 private String keyClassName; 1579 private String valClassName; 1580 private Class keyClass; 1581 private Class valClass; 1582 1583 private CompressionCodec codec = null; 1584 private Metadata metadata = null; 1585 1586 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1587 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1588 private boolean syncSeen; 1589 1590 private long headerEnd; 1591 private long end; 1592 private int keyLength; 1593 private int recordLength; 1594 1595 private boolean decompress; 1596 private boolean blockCompressed; 1597 1598 private Configuration conf; 1599 1600 private int noBufferedRecords = 0; 1601 private boolean lazyDecompress = true; 1602 private boolean valuesDecompressed = true; 1603 1604 private int noBufferedKeys = 0; 1605 private int noBufferedValues = 0; 1606 1607 private DataInputBuffer keyLenBuffer = null; 1608 private CompressionInputStream keyLenInFilter = null; 1609 private DataInputStream keyLenIn = null; 1610 private Decompressor keyLenDecompressor = null; 1611 private DataInputBuffer keyBuffer = null; 1612 private CompressionInputStream keyInFilter = null; 1613 private DataInputStream keyIn = null; 1614 private Decompressor keyDecompressor = null; 1615 1616 private DataInputBuffer valLenBuffer = null; 1617 private CompressionInputStream valLenInFilter = null; 1618 private DataInputStream valLenIn = null; 1619 private Decompressor valLenDecompressor = null; 1620 private DataInputBuffer valBuffer = null; 1621 private CompressionInputStream valInFilter = null; 1622 private DataInputStream valIn = null; 1623 private Decompressor valDecompressor = null; 1624 1625 private Deserializer keyDeserializer; 1626 private Deserializer valDeserializer; 1627 1628 /** 1629 * A tag interface for all of the Reader options 1630 */ 1631 public static interface Option {} 1632 1633 /** 1634 * Create an option to specify the path name of the sequence file. 1635 * @param value the path to read 1636 * @return a new option 1637 */ 1638 public static Option file(Path value) { 1639 return new FileOption(value); 1640 } 1641 1642 /** 1643 * Create an option to specify the stream with the sequence file. 1644 * @param value the stream to read. 1645 * @return a new option 1646 */ 1647 public static Option stream(FSDataInputStream value) { 1648 return new InputStreamOption(value); 1649 } 1650 1651 /** 1652 * Create an option to specify the starting byte to read. 1653 * @param value the number of bytes to skip over 1654 * @return a new option 1655 */ 1656 public static Option start(long value) { 1657 return new StartOption(value); 1658 } 1659 1660 /** 1661 * Create an option to specify the number of bytes to read. 1662 * @param value the number of bytes to read 1663 * @return a new option 1664 */ 1665 public static Option length(long value) { 1666 return new LengthOption(value); 1667 } 1668 1669 /** 1670 * Create an option with the buffer size for reading the given pathname. 1671 * @param value the number of bytes to buffer 1672 * @return a new option 1673 */ 1674 public static Option bufferSize(int value) { 1675 return new BufferSizeOption(value); 1676 } 1677 1678 private static class FileOption extends Options.PathOption 1679 implements Option { 1680 private FileOption(Path value) { 1681 super(value); 1682 } 1683 } 1684 1685 private static class InputStreamOption 1686 extends Options.FSDataInputStreamOption 1687 implements Option { 1688 private InputStreamOption(FSDataInputStream value) { 1689 super(value); 1690 } 1691 } 1692 1693 private static class StartOption extends Options.LongOption 1694 implements Option { 1695 private StartOption(long value) { 1696 super(value); 1697 } 1698 } 1699 1700 private static class LengthOption extends Options.LongOption 1701 implements Option { 1702 private LengthOption(long value) { 1703 super(value); 1704 } 1705 } 1706 1707 private static class BufferSizeOption extends Options.IntegerOption 1708 implements Option { 1709 private BufferSizeOption(int value) { 1710 super(value); 1711 } 1712 } 1713 1714 // only used directly 1715 private static class OnlyHeaderOption extends Options.BooleanOption 1716 implements Option { 1717 private OnlyHeaderOption() { 1718 super(true); 1719 } 1720 } 1721 1722 public Reader(Configuration conf, Option... opts) throws IOException { 1723 // Look up the options, these are null if not set 1724 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1725 InputStreamOption streamOpt = 1726 Options.getOption(InputStreamOption.class, opts); 1727 StartOption startOpt = Options.getOption(StartOption.class, opts); 1728 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1729 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1730 OnlyHeaderOption headerOnly = 1731 Options.getOption(OnlyHeaderOption.class, opts); 1732 // check for consistency 1733 if ((fileOpt == null) == (streamOpt == null)) { 1734 throw new 1735 IllegalArgumentException("File or stream option must be specified"); 1736 } 1737 if (fileOpt == null && bufOpt != null) { 1738 throw new IllegalArgumentException("buffer size can only be set when" + 1739 " a file is specified."); 1740 } 1741 // figure out the real values 1742 Path filename = null; 1743 FSDataInputStream file; 1744 final long len; 1745 if (fileOpt != null) { 1746 filename = fileOpt.getValue(); 1747 FileSystem fs = filename.getFileSystem(conf); 1748 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1749 len = null == lenOpt 1750 ? fs.getFileStatus(filename).getLen() 1751 : lenOpt.getValue(); 1752 file = openFile(fs, filename, bufSize, len); 1753 } else { 1754 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1755 file = streamOpt.getValue(); 1756 } 1757 long start = startOpt == null ? 0 : startOpt.getValue(); 1758 // really set up 1759 initialize(filename, file, start, len, conf, headerOnly != null); 1760 } 1761 1762 /** 1763 * Construct a reader by opening a file from the given file system. 1764 * @param fs The file system used to open the file. 1765 * @param file The file being read. 1766 * @param conf Configuration 1767 * @throws IOException 1768 * @deprecated Use Reader(Configuration, Option...) instead. 1769 */ 1770 @Deprecated 1771 public Reader(FileSystem fs, Path file, 1772 Configuration conf) throws IOException { 1773 this(conf, file(file.makeQualified(fs))); 1774 } 1775 1776 /** 1777 * Construct a reader by the given input stream. 1778 * @param in An input stream. 1779 * @param buffersize unused 1780 * @param start The starting position. 1781 * @param length The length being read. 1782 * @param conf Configuration 1783 * @throws IOException 1784 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1785 */ 1786 @Deprecated 1787 public Reader(FSDataInputStream in, int buffersize, 1788 long start, long length, Configuration conf) throws IOException { 1789 this(conf, stream(in), start(start), length(length)); 1790 } 1791 1792 /** Common work of the constructors. */ 1793 private void initialize(Path filename, FSDataInputStream in, 1794 long start, long length, Configuration conf, 1795 boolean tempReader) throws IOException { 1796 if (in == null) { 1797 throw new IllegalArgumentException("in == null"); 1798 } 1799 this.filename = filename == null ? "<unknown>" : filename.toString(); 1800 this.in = in; 1801 this.conf = conf; 1802 boolean succeeded = false; 1803 try { 1804 seek(start); 1805 this.end = this.in.getPos() + length; 1806 // if it wrapped around, use the max 1807 if (end < length) { 1808 end = Long.MAX_VALUE; 1809 } 1810 init(tempReader); 1811 succeeded = true; 1812 } finally { 1813 if (!succeeded) { 1814 IOUtils.cleanup(LOG, this.in); 1815 } 1816 } 1817 } 1818 1819 /** 1820 * Override this method to specialize the type of 1821 * {@link FSDataInputStream} returned. 1822 * @param fs The file system used to open the file. 1823 * @param file The file being read. 1824 * @param bufferSize The buffer size used to read the file. 1825 * @param length The length being read if it is >= 0. Otherwise, 1826 * the length is not available. 1827 * @return The opened stream. 1828 * @throws IOException 1829 */ 1830 protected FSDataInputStream openFile(FileSystem fs, Path file, 1831 int bufferSize, long length) throws IOException { 1832 return fs.open(file, bufferSize); 1833 } 1834 1835 /** 1836 * Initialize the {@link Reader} 1837 * @param tmpReader <code>true</code> if we are constructing a temporary 1838 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1839 * and hence do not initialize every component; 1840 * <code>false</code> otherwise. 1841 * @throws IOException 1842 */ 1843 private void init(boolean tempReader) throws IOException { 1844 byte[] versionBlock = new byte[VERSION.length]; 1845 in.readFully(versionBlock); 1846 1847 if ((versionBlock[0] != VERSION[0]) || 1848 (versionBlock[1] != VERSION[1]) || 1849 (versionBlock[2] != VERSION[2])) 1850 throw new IOException(this + " not a SequenceFile"); 1851 1852 // Set 'version' 1853 version = versionBlock[3]; 1854 if (version > VERSION[3]) 1855 throw new VersionMismatchException(VERSION[3], version); 1856 1857 if (version < BLOCK_COMPRESS_VERSION) { 1858 UTF8 className = new UTF8(); 1859 1860 className.readFields(in); 1861 keyClassName = className.toStringChecked(); // key class name 1862 1863 className.readFields(in); 1864 valClassName = className.toStringChecked(); // val class name 1865 } else { 1866 keyClassName = Text.readString(in); 1867 valClassName = Text.readString(in); 1868 } 1869 1870 if (version > 2) { // if version > 2 1871 this.decompress = in.readBoolean(); // is compressed? 1872 } else { 1873 decompress = false; 1874 } 1875 1876 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1877 this.blockCompressed = in.readBoolean(); // is block-compressed? 1878 } else { 1879 blockCompressed = false; 1880 } 1881 1882 // if version >= 5 1883 // setup the compression codec 1884 if (decompress) { 1885 if (version >= CUSTOM_COMPRESS_VERSION) { 1886 String codecClassname = Text.readString(in); 1887 try { 1888 Class<? extends CompressionCodec> codecClass 1889 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1890 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1891 } catch (ClassNotFoundException cnfe) { 1892 throw new IllegalArgumentException("Unknown codec: " + 1893 codecClassname, cnfe); 1894 } 1895 } else { 1896 codec = new DefaultCodec(); 1897 ((Configurable)codec).setConf(conf); 1898 } 1899 } 1900 1901 this.metadata = new Metadata(); 1902 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1903 this.metadata.readFields(in); 1904 } 1905 1906 if (version > 1) { // if version > 1 1907 in.readFully(sync); // read sync bytes 1908 headerEnd = in.getPos(); // record end of header 1909 } 1910 1911 // Initialize... *not* if this we are constructing a temporary Reader 1912 if (!tempReader) { 1913 valBuffer = new DataInputBuffer(); 1914 if (decompress) { 1915 valDecompressor = CodecPool.getDecompressor(codec); 1916 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1917 valIn = new DataInputStream(valInFilter); 1918 } else { 1919 valIn = valBuffer; 1920 } 1921 1922 if (blockCompressed) { 1923 keyLenBuffer = new DataInputBuffer(); 1924 keyBuffer = new DataInputBuffer(); 1925 valLenBuffer = new DataInputBuffer(); 1926 1927 keyLenDecompressor = CodecPool.getDecompressor(codec); 1928 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1929 keyLenDecompressor); 1930 keyLenIn = new DataInputStream(keyLenInFilter); 1931 1932 keyDecompressor = CodecPool.getDecompressor(codec); 1933 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 1934 keyIn = new DataInputStream(keyInFilter); 1935 1936 valLenDecompressor = CodecPool.getDecompressor(codec); 1937 valLenInFilter = codec.createInputStream(valLenBuffer, 1938 valLenDecompressor); 1939 valLenIn = new DataInputStream(valLenInFilter); 1940 } 1941 1942 SerializationFactory serializationFactory = 1943 new SerializationFactory(conf); 1944 this.keyDeserializer = 1945 getDeserializer(serializationFactory, getKeyClass()); 1946 if (this.keyDeserializer == null) { 1947 throw new IOException( 1948 "Could not find a deserializer for the Key class: '" 1949 + getKeyClass().getCanonicalName() + "'. " 1950 + "Please ensure that the configuration '" + 1951 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1952 + "properly configured, if you're using " 1953 + "custom serialization."); 1954 } 1955 if (!blockCompressed) { 1956 this.keyDeserializer.open(valBuffer); 1957 } else { 1958 this.keyDeserializer.open(keyIn); 1959 } 1960 this.valDeserializer = 1961 getDeserializer(serializationFactory, getValueClass()); 1962 if (this.valDeserializer == null) { 1963 throw new IOException( 1964 "Could not find a deserializer for the Value class: '" 1965 + getValueClass().getCanonicalName() + "'. " 1966 + "Please ensure that the configuration '" + 1967 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1968 + "properly configured, if you're using " 1969 + "custom serialization."); 1970 } 1971 this.valDeserializer.open(valIn); 1972 } 1973 } 1974 1975 @SuppressWarnings("unchecked") 1976 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 1977 return sf.getDeserializer(c); 1978 } 1979 1980 /** Close the file. */ 1981 @Override 1982 public synchronized void close() throws IOException { 1983 // Return the decompressors to the pool 1984 CodecPool.returnDecompressor(keyLenDecompressor); 1985 CodecPool.returnDecompressor(keyDecompressor); 1986 CodecPool.returnDecompressor(valLenDecompressor); 1987 CodecPool.returnDecompressor(valDecompressor); 1988 keyLenDecompressor = keyDecompressor = null; 1989 valLenDecompressor = valDecompressor = null; 1990 1991 if (keyDeserializer != null) { 1992 keyDeserializer.close(); 1993 } 1994 if (valDeserializer != null) { 1995 valDeserializer.close(); 1996 } 1997 1998 // Close the input-stream 1999 in.close(); 2000 } 2001 2002 /** Returns the name of the key class. */ 2003 public String getKeyClassName() { 2004 return keyClassName; 2005 } 2006 2007 /** Returns the class of keys in this file. */ 2008 public synchronized Class<?> getKeyClass() { 2009 if (null == keyClass) { 2010 try { 2011 keyClass = WritableName.getClass(getKeyClassName(), conf); 2012 } catch (IOException e) { 2013 throw new RuntimeException(e); 2014 } 2015 } 2016 return keyClass; 2017 } 2018 2019 /** Returns the name of the value class. */ 2020 public String getValueClassName() { 2021 return valClassName; 2022 } 2023 2024 /** Returns the class of values in this file. */ 2025 public synchronized Class<?> getValueClass() { 2026 if (null == valClass) { 2027 try { 2028 valClass = WritableName.getClass(getValueClassName(), conf); 2029 } catch (IOException e) { 2030 throw new RuntimeException(e); 2031 } 2032 } 2033 return valClass; 2034 } 2035 2036 /** Returns true if values are compressed. */ 2037 public boolean isCompressed() { return decompress; } 2038 2039 /** Returns true if records are block-compressed. */ 2040 public boolean isBlockCompressed() { return blockCompressed; } 2041 2042 /** Returns the compression codec of data in this file. */ 2043 public CompressionCodec getCompressionCodec() { return codec; } 2044 2045 /** 2046 * Get the compression type for this file. 2047 * @return the compression type 2048 */ 2049 public CompressionType getCompressionType() { 2050 if (decompress) { 2051 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2052 } else { 2053 return CompressionType.NONE; 2054 } 2055 } 2056 2057 /** Returns the metadata object of the file */ 2058 public Metadata getMetadata() { 2059 return this.metadata; 2060 } 2061 2062 /** Returns the configuration used for this file. */ 2063 Configuration getConf() { return conf; } 2064 2065 /** Read a compressed buffer */ 2066 private synchronized void readBuffer(DataInputBuffer buffer, 2067 CompressionInputStream filter) throws IOException { 2068 // Read data into a temporary buffer 2069 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2070 2071 try { 2072 int dataBufferLength = WritableUtils.readVInt(in); 2073 dataBuffer.write(in, dataBufferLength); 2074 2075 // Set up 'buffer' connected to the input-stream 2076 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2077 } finally { 2078 dataBuffer.close(); 2079 } 2080 2081 // Reset the codec 2082 filter.resetState(); 2083 } 2084 2085 /** Read the next 'compressed' block */ 2086 private synchronized void readBlock() throws IOException { 2087 // Check if we need to throw away a whole block of 2088 // 'values' due to 'lazy decompression' 2089 if (lazyDecompress && !valuesDecompressed) { 2090 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2091 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2092 } 2093 2094 // Reset internal states 2095 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2096 valuesDecompressed = false; 2097 2098 //Process sync 2099 if (sync != null) { 2100 in.readInt(); 2101 in.readFully(syncCheck); // read syncCheck 2102 if (!Arrays.equals(sync, syncCheck)) // check it 2103 throw new IOException("File is corrupt!"); 2104 } 2105 syncSeen = true; 2106 2107 // Read number of records in this block 2108 noBufferedRecords = WritableUtils.readVInt(in); 2109 2110 // Read key lengths and keys 2111 readBuffer(keyLenBuffer, keyLenInFilter); 2112 readBuffer(keyBuffer, keyInFilter); 2113 noBufferedKeys = noBufferedRecords; 2114 2115 // Read value lengths and values 2116 if (!lazyDecompress) { 2117 readBuffer(valLenBuffer, valLenInFilter); 2118 readBuffer(valBuffer, valInFilter); 2119 noBufferedValues = noBufferedRecords; 2120 valuesDecompressed = true; 2121 } 2122 } 2123 2124 /** 2125 * Position valLenIn/valIn to the 'value' 2126 * corresponding to the 'current' key 2127 */ 2128 private synchronized void seekToCurrentValue() throws IOException { 2129 if (!blockCompressed) { 2130 if (decompress) { 2131 valInFilter.resetState(); 2132 } 2133 valBuffer.reset(); 2134 } else { 2135 // Check if this is the first value in the 'block' to be read 2136 if (lazyDecompress && !valuesDecompressed) { 2137 // Read the value lengths and values 2138 readBuffer(valLenBuffer, valLenInFilter); 2139 readBuffer(valBuffer, valInFilter); 2140 noBufferedValues = noBufferedRecords; 2141 valuesDecompressed = true; 2142 } 2143 2144 // Calculate the no. of bytes to skip 2145 // Note: 'current' key has already been read! 2146 int skipValBytes = 0; 2147 int currentKey = noBufferedKeys + 1; 2148 for (int i=noBufferedValues; i > currentKey; --i) { 2149 skipValBytes += WritableUtils.readVInt(valLenIn); 2150 --noBufferedValues; 2151 } 2152 2153 // Skip to the 'val' corresponding to 'current' key 2154 if (skipValBytes > 0) { 2155 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2156 throw new IOException("Failed to seek to " + currentKey + 2157 "(th) value!"); 2158 } 2159 } 2160 } 2161 } 2162 2163 /** 2164 * Get the 'value' corresponding to the last read 'key'. 2165 * @param val : The 'value' to be read. 2166 * @throws IOException 2167 */ 2168 public synchronized void getCurrentValue(Writable val) 2169 throws IOException { 2170 if (val instanceof Configurable) { 2171 ((Configurable) val).setConf(this.conf); 2172 } 2173 2174 // Position stream to 'current' value 2175 seekToCurrentValue(); 2176 2177 if (!blockCompressed) { 2178 val.readFields(valIn); 2179 2180 if (valIn.read() > 0) { 2181 LOG.info("available bytes: " + valIn.available()); 2182 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2183 + " bytes, should read " + 2184 (valBuffer.getLength()-keyLength)); 2185 } 2186 } else { 2187 // Get the value 2188 int valLength = WritableUtils.readVInt(valLenIn); 2189 val.readFields(valIn); 2190 2191 // Read another compressed 'value' 2192 --noBufferedValues; 2193 2194 // Sanity check 2195 if ((valLength < 0) && LOG.isDebugEnabled()) { 2196 LOG.debug(val + " is a zero-length value"); 2197 } 2198 } 2199 2200 } 2201 2202 /** 2203 * Get the 'value' corresponding to the last read 'key'. 2204 * @param val : The 'value' to be read. 2205 * @throws IOException 2206 */ 2207 public synchronized Object getCurrentValue(Object val) 2208 throws IOException { 2209 if (val instanceof Configurable) { 2210 ((Configurable) val).setConf(this.conf); 2211 } 2212 2213 // Position stream to 'current' value 2214 seekToCurrentValue(); 2215 2216 if (!blockCompressed) { 2217 val = deserializeValue(val); 2218 2219 if (valIn.read() > 0) { 2220 LOG.info("available bytes: " + valIn.available()); 2221 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2222 + " bytes, should read " + 2223 (valBuffer.getLength()-keyLength)); 2224 } 2225 } else { 2226 // Get the value 2227 int valLength = WritableUtils.readVInt(valLenIn); 2228 val = deserializeValue(val); 2229 2230 // Read another compressed 'value' 2231 --noBufferedValues; 2232 2233 // Sanity check 2234 if ((valLength < 0) && LOG.isDebugEnabled()) { 2235 LOG.debug(val + " is a zero-length value"); 2236 } 2237 } 2238 return val; 2239 2240 } 2241 2242 @SuppressWarnings("unchecked") 2243 private Object deserializeValue(Object val) throws IOException { 2244 return valDeserializer.deserialize(val); 2245 } 2246 2247 /** Read the next key in the file into <code>key</code>, skipping its 2248 * value. True if another entry exists, and false at end of file. */ 2249 public synchronized boolean next(Writable key) throws IOException { 2250 if (key.getClass() != getKeyClass()) 2251 throw new IOException("wrong key class: "+key.getClass().getName() 2252 +" is not "+keyClass); 2253 2254 if (!blockCompressed) { 2255 outBuf.reset(); 2256 2257 keyLength = next(outBuf); 2258 if (keyLength < 0) 2259 return false; 2260 2261 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2262 2263 key.readFields(valBuffer); 2264 valBuffer.mark(0); 2265 if (valBuffer.getPosition() != keyLength) 2266 throw new IOException(key + " read " + valBuffer.getPosition() 2267 + " bytes, should read " + keyLength); 2268 } else { 2269 //Reset syncSeen 2270 syncSeen = false; 2271 2272 if (noBufferedKeys == 0) { 2273 try { 2274 readBlock(); 2275 } catch (EOFException eof) { 2276 return false; 2277 } 2278 } 2279 2280 int keyLength = WritableUtils.readVInt(keyLenIn); 2281 2282 // Sanity check 2283 if (keyLength < 0) { 2284 return false; 2285 } 2286 2287 //Read another compressed 'key' 2288 key.readFields(keyIn); 2289 --noBufferedKeys; 2290 } 2291 2292 return true; 2293 } 2294 2295 /** Read the next key/value pair in the file into <code>key</code> and 2296 * <code>val</code>. Returns true if such a pair exists and false when at 2297 * end of file */ 2298 public synchronized boolean next(Writable key, Writable val) 2299 throws IOException { 2300 if (val.getClass() != getValueClass()) 2301 throw new IOException("wrong value class: "+val+" is not "+valClass); 2302 2303 boolean more = next(key); 2304 2305 if (more) { 2306 getCurrentValue(val); 2307 } 2308 2309 return more; 2310 } 2311 2312 /** 2313 * Read and return the next record length, potentially skipping over 2314 * a sync block. 2315 * @return the length of the next record or -1 if there is no next record 2316 * @throws IOException 2317 */ 2318 private synchronized int readRecordLength() throws IOException { 2319 if (in.getPos() >= end) { 2320 return -1; 2321 } 2322 int length = in.readInt(); 2323 if (version > 1 && sync != null && 2324 length == SYNC_ESCAPE) { // process a sync entry 2325 in.readFully(syncCheck); // read syncCheck 2326 if (!Arrays.equals(sync, syncCheck)) // check it 2327 throw new IOException("File is corrupt!"); 2328 syncSeen = true; 2329 if (in.getPos() >= end) { 2330 return -1; 2331 } 2332 length = in.readInt(); // re-read length 2333 } else { 2334 syncSeen = false; 2335 } 2336 2337 return length; 2338 } 2339 2340 /** Read the next key/value pair in the file into <code>buffer</code>. 2341 * Returns the length of the key read, or -1 if at end of file. The length 2342 * of the value may be computed by calling buffer.getLength() before and 2343 * after calls to this method. */ 2344 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2345 @Deprecated 2346 synchronized int next(DataOutputBuffer buffer) throws IOException { 2347 // Unsupported for block-compressed sequence files 2348 if (blockCompressed) { 2349 throw new IOException("Unsupported call for block-compressed" + 2350 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2351 } 2352 try { 2353 int length = readRecordLength(); 2354 if (length == -1) { 2355 return -1; 2356 } 2357 int keyLength = in.readInt(); 2358 buffer.write(in, length); 2359 return keyLength; 2360 } catch (ChecksumException e) { // checksum failure 2361 handleChecksumException(e); 2362 return next(buffer); 2363 } 2364 } 2365 2366 public ValueBytes createValueBytes() { 2367 ValueBytes val = null; 2368 if (!decompress || blockCompressed) { 2369 val = new UncompressedBytes(); 2370 } else { 2371 val = new CompressedBytes(codec); 2372 } 2373 return val; 2374 } 2375 2376 /** 2377 * Read 'raw' records. 2378 * @param key - The buffer into which the key is read 2379 * @param val - The 'raw' value 2380 * @return Returns the total record length or -1 for end of file 2381 * @throws IOException 2382 */ 2383 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2384 throws IOException { 2385 if (!blockCompressed) { 2386 int length = readRecordLength(); 2387 if (length == -1) { 2388 return -1; 2389 } 2390 int keyLength = in.readInt(); 2391 int valLength = length - keyLength; 2392 key.write(in, keyLength); 2393 if (decompress) { 2394 CompressedBytes value = (CompressedBytes)val; 2395 value.reset(in, valLength); 2396 } else { 2397 UncompressedBytes value = (UncompressedBytes)val; 2398 value.reset(in, valLength); 2399 } 2400 2401 return length; 2402 } else { 2403 //Reset syncSeen 2404 syncSeen = false; 2405 2406 // Read 'key' 2407 if (noBufferedKeys == 0) { 2408 if (in.getPos() >= end) 2409 return -1; 2410 2411 try { 2412 readBlock(); 2413 } catch (EOFException eof) { 2414 return -1; 2415 } 2416 } 2417 int keyLength = WritableUtils.readVInt(keyLenIn); 2418 if (keyLength < 0) { 2419 throw new IOException("zero length key found!"); 2420 } 2421 key.write(keyIn, keyLength); 2422 --noBufferedKeys; 2423 2424 // Read raw 'value' 2425 seekToCurrentValue(); 2426 int valLength = WritableUtils.readVInt(valLenIn); 2427 UncompressedBytes rawValue = (UncompressedBytes)val; 2428 rawValue.reset(valIn, valLength); 2429 --noBufferedValues; 2430 2431 return (keyLength+valLength); 2432 } 2433 2434 } 2435 2436 /** 2437 * Read 'raw' keys. 2438 * @param key - The buffer into which the key is read 2439 * @return Returns the key length or -1 for end of file 2440 * @throws IOException 2441 */ 2442 public synchronized int nextRawKey(DataOutputBuffer key) 2443 throws IOException { 2444 if (!blockCompressed) { 2445 recordLength = readRecordLength(); 2446 if (recordLength == -1) { 2447 return -1; 2448 } 2449 keyLength = in.readInt(); 2450 key.write(in, keyLength); 2451 return keyLength; 2452 } else { 2453 //Reset syncSeen 2454 syncSeen = false; 2455 2456 // Read 'key' 2457 if (noBufferedKeys == 0) { 2458 if (in.getPos() >= end) 2459 return -1; 2460 2461 try { 2462 readBlock(); 2463 } catch (EOFException eof) { 2464 return -1; 2465 } 2466 } 2467 int keyLength = WritableUtils.readVInt(keyLenIn); 2468 if (keyLength < 0) { 2469 throw new IOException("zero length key found!"); 2470 } 2471 key.write(keyIn, keyLength); 2472 --noBufferedKeys; 2473 2474 return keyLength; 2475 } 2476 2477 } 2478 2479 /** Read the next key in the file, skipping its 2480 * value. Return null at end of file. */ 2481 public synchronized Object next(Object key) throws IOException { 2482 if (key != null && key.getClass() != getKeyClass()) { 2483 throw new IOException("wrong key class: "+key.getClass().getName() 2484 +" is not "+keyClass); 2485 } 2486 2487 if (!blockCompressed) { 2488 outBuf.reset(); 2489 2490 keyLength = next(outBuf); 2491 if (keyLength < 0) 2492 return null; 2493 2494 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2495 2496 key = deserializeKey(key); 2497 valBuffer.mark(0); 2498 if (valBuffer.getPosition() != keyLength) 2499 throw new IOException(key + " read " + valBuffer.getPosition() 2500 + " bytes, should read " + keyLength); 2501 } else { 2502 //Reset syncSeen 2503 syncSeen = false; 2504 2505 if (noBufferedKeys == 0) { 2506 try { 2507 readBlock(); 2508 } catch (EOFException eof) { 2509 return null; 2510 } 2511 } 2512 2513 int keyLength = WritableUtils.readVInt(keyLenIn); 2514 2515 // Sanity check 2516 if (keyLength < 0) { 2517 return null; 2518 } 2519 2520 //Read another compressed 'key' 2521 key = deserializeKey(key); 2522 --noBufferedKeys; 2523 } 2524 2525 return key; 2526 } 2527 2528 @SuppressWarnings("unchecked") 2529 private Object deserializeKey(Object key) throws IOException { 2530 return keyDeserializer.deserialize(key); 2531 } 2532 2533 /** 2534 * Read 'raw' values. 2535 * @param val - The 'raw' value 2536 * @return Returns the value length 2537 * @throws IOException 2538 */ 2539 public synchronized int nextRawValue(ValueBytes val) 2540 throws IOException { 2541 2542 // Position stream to current value 2543 seekToCurrentValue(); 2544 2545 if (!blockCompressed) { 2546 int valLength = recordLength - keyLength; 2547 if (decompress) { 2548 CompressedBytes value = (CompressedBytes)val; 2549 value.reset(in, valLength); 2550 } else { 2551 UncompressedBytes value = (UncompressedBytes)val; 2552 value.reset(in, valLength); 2553 } 2554 2555 return valLength; 2556 } else { 2557 int valLength = WritableUtils.readVInt(valLenIn); 2558 UncompressedBytes rawValue = (UncompressedBytes)val; 2559 rawValue.reset(valIn, valLength); 2560 --noBufferedValues; 2561 return valLength; 2562 } 2563 2564 } 2565 2566 private void handleChecksumException(ChecksumException e) 2567 throws IOException { 2568 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2569 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2570 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2571 } else { 2572 throw e; 2573 } 2574 } 2575 2576 /** disables sync. often invoked for tmp files */ 2577 synchronized void ignoreSync() { 2578 sync = null; 2579 } 2580 2581 /** Set the current byte position in the input file. 2582 * 2583 * <p>The position passed must be a position returned by {@link 2584 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2585 * position, use {@link SequenceFile.Reader#sync(long)}. 2586 */ 2587 public synchronized void seek(long position) throws IOException { 2588 in.seek(position); 2589 if (blockCompressed) { // trigger block read 2590 noBufferedKeys = 0; 2591 valuesDecompressed = true; 2592 } 2593 } 2594 2595 /** Seek to the next sync mark past a given position.*/ 2596 public synchronized void sync(long position) throws IOException { 2597 if (position+SYNC_SIZE >= end) { 2598 seek(end); 2599 return; 2600 } 2601 2602 if (position < headerEnd) { 2603 // seek directly to first record 2604 in.seek(headerEnd); 2605 // note the sync marker "seen" in the header 2606 syncSeen = true; 2607 return; 2608 } 2609 2610 try { 2611 seek(position+4); // skip escape 2612 in.readFully(syncCheck); 2613 int syncLen = sync.length; 2614 for (int i = 0; in.getPos() < end; i++) { 2615 int j = 0; 2616 for (; j < syncLen; j++) { 2617 if (sync[j] != syncCheck[(i+j)%syncLen]) 2618 break; 2619 } 2620 if (j == syncLen) { 2621 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2622 return; 2623 } 2624 syncCheck[i%syncLen] = in.readByte(); 2625 } 2626 } catch (ChecksumException e) { // checksum failure 2627 handleChecksumException(e); 2628 } 2629 } 2630 2631 /** Returns true iff the previous call to next passed a sync mark.*/ 2632 public synchronized boolean syncSeen() { return syncSeen; } 2633 2634 /** Return the current byte position in the input file. */ 2635 public synchronized long getPosition() throws IOException { 2636 return in.getPos(); 2637 } 2638 2639 /** Returns the name of the file. */ 2640 @Override 2641 public String toString() { 2642 return filename; 2643 } 2644 2645 } 2646 2647 /** Sorts key/value pairs in a sequence-format file. 2648 * 2649 * <p>For best performance, applications should make sure that the {@link 2650 * Writable#readFields(DataInput)} implementation of their keys is 2651 * very efficient. In particular, it should avoid allocating memory. 2652 */ 2653 public static class Sorter { 2654 2655 private RawComparator comparator; 2656 2657 private MergeSort mergeSort; //the implementation of merge sort 2658 2659 private Path[] inFiles; // when merging or sorting 2660 2661 private Path outFile; 2662 2663 private int memory; // bytes 2664 private int factor; // merged per pass 2665 2666 private FileSystem fs = null; 2667 2668 private Class keyClass; 2669 private Class valClass; 2670 2671 private Configuration conf; 2672 private Metadata metadata; 2673 2674 private Progressable progressable = null; 2675 2676 /** Sort and merge files containing the named classes. */ 2677 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2678 Class valClass, Configuration conf) { 2679 this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf); 2680 } 2681 2682 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2683 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2684 Class valClass, Configuration conf) { 2685 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2686 } 2687 2688 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2689 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2690 Class valClass, Configuration conf, Metadata metadata) { 2691 this.fs = fs; 2692 this.comparator = comparator; 2693 this.keyClass = keyClass; 2694 this.valClass = valClass; 2695 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2696 this.factor = conf.getInt("io.sort.factor", 100); 2697 this.conf = conf; 2698 this.metadata = metadata; 2699 } 2700 2701 /** Set the number of streams to merge at once.*/ 2702 public void setFactor(int factor) { this.factor = factor; } 2703 2704 /** Get the number of streams to merge at once.*/ 2705 public int getFactor() { return factor; } 2706 2707 /** Set the total amount of buffer memory, in bytes.*/ 2708 public void setMemory(int memory) { this.memory = memory; } 2709 2710 /** Get the total amount of buffer memory, in bytes.*/ 2711 public int getMemory() { return memory; } 2712 2713 /** Set the progressable object in order to report progress. */ 2714 public void setProgressable(Progressable progressable) { 2715 this.progressable = progressable; 2716 } 2717 2718 /** 2719 * Perform a file sort from a set of input files into an output file. 2720 * @param inFiles the files to be sorted 2721 * @param outFile the sorted output file 2722 * @param deleteInput should the input files be deleted as they are read? 2723 */ 2724 public void sort(Path[] inFiles, Path outFile, 2725 boolean deleteInput) throws IOException { 2726 if (fs.exists(outFile)) { 2727 throw new IOException("already exists: " + outFile); 2728 } 2729 2730 this.inFiles = inFiles; 2731 this.outFile = outFile; 2732 2733 int segments = sortPass(deleteInput); 2734 if (segments > 1) { 2735 mergePass(outFile.getParent()); 2736 } 2737 } 2738 2739 /** 2740 * Perform a file sort from a set of input files and return an iterator. 2741 * @param inFiles the files to be sorted 2742 * @param tempDir the directory where temp files are created during sort 2743 * @param deleteInput should the input files be deleted as they are read? 2744 * @return iterator the RawKeyValueIterator 2745 */ 2746 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2747 boolean deleteInput) throws IOException { 2748 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2749 if (fs.exists(outFile)) { 2750 throw new IOException("already exists: " + outFile); 2751 } 2752 this.inFiles = inFiles; 2753 //outFile will basically be used as prefix for temp files in the cases 2754 //where sort outputs multiple sorted segments. For the single segment 2755 //case, the outputFile itself will contain the sorted data for that 2756 //segment 2757 this.outFile = outFile; 2758 2759 int segments = sortPass(deleteInput); 2760 if (segments > 1) 2761 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2762 tempDir); 2763 else if (segments == 1) 2764 return merge(new Path[]{outFile}, true, tempDir); 2765 else return null; 2766 } 2767 2768 /** 2769 * The backwards compatible interface to sort. 2770 * @param inFile the input file to sort 2771 * @param outFile the sorted output file 2772 */ 2773 public void sort(Path inFile, Path outFile) throws IOException { 2774 sort(new Path[]{inFile}, outFile, false); 2775 } 2776 2777 private int sortPass(boolean deleteInput) throws IOException { 2778 if(LOG.isDebugEnabled()) { 2779 LOG.debug("running sort pass"); 2780 } 2781 SortPass sortPass = new SortPass(); // make the SortPass 2782 sortPass.setProgressable(progressable); 2783 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2784 try { 2785 return sortPass.run(deleteInput); // run it 2786 } finally { 2787 sortPass.close(); // close it 2788 } 2789 } 2790 2791 private class SortPass { 2792 private int memoryLimit = memory/4; 2793 private int recordLimit = 1000000; 2794 2795 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2796 private byte[] rawBuffer; 2797 2798 private int[] keyOffsets = new int[1024]; 2799 private int[] pointers = new int[keyOffsets.length]; 2800 private int[] pointersCopy = new int[keyOffsets.length]; 2801 private int[] keyLengths = new int[keyOffsets.length]; 2802 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2803 2804 private ArrayList segmentLengths = new ArrayList(); 2805 2806 private Reader in = null; 2807 private FSDataOutputStream out = null; 2808 private FSDataOutputStream indexOut = null; 2809 private Path outName; 2810 2811 private Progressable progressable = null; 2812 2813 public int run(boolean deleteInput) throws IOException { 2814 int segments = 0; 2815 int currentFile = 0; 2816 boolean atEof = (currentFile >= inFiles.length); 2817 CompressionType compressionType; 2818 CompressionCodec codec = null; 2819 segmentLengths.clear(); 2820 if (atEof) { 2821 return 0; 2822 } 2823 2824 // Initialize 2825 in = new Reader(fs, inFiles[currentFile], conf); 2826 compressionType = in.getCompressionType(); 2827 codec = in.getCompressionCodec(); 2828 2829 for (int i=0; i < rawValues.length; ++i) { 2830 rawValues[i] = null; 2831 } 2832 2833 while (!atEof) { 2834 int count = 0; 2835 int bytesProcessed = 0; 2836 rawKeys.reset(); 2837 while (!atEof && 2838 bytesProcessed < memoryLimit && count < recordLimit) { 2839 2840 // Read a record into buffer 2841 // Note: Attempt to re-use 'rawValue' as far as possible 2842 int keyOffset = rawKeys.getLength(); 2843 ValueBytes rawValue = 2844 (count == keyOffsets.length || rawValues[count] == null) ? 2845 in.createValueBytes() : 2846 rawValues[count]; 2847 int recordLength = in.nextRaw(rawKeys, rawValue); 2848 if (recordLength == -1) { 2849 in.close(); 2850 if (deleteInput) { 2851 fs.delete(inFiles[currentFile], true); 2852 } 2853 currentFile += 1; 2854 atEof = currentFile >= inFiles.length; 2855 if (!atEof) { 2856 in = new Reader(fs, inFiles[currentFile], conf); 2857 } else { 2858 in = null; 2859 } 2860 continue; 2861 } 2862 2863 int keyLength = rawKeys.getLength() - keyOffset; 2864 2865 if (count == keyOffsets.length) 2866 grow(); 2867 2868 keyOffsets[count] = keyOffset; // update pointers 2869 pointers[count] = count; 2870 keyLengths[count] = keyLength; 2871 rawValues[count] = rawValue; 2872 2873 bytesProcessed += recordLength; 2874 count++; 2875 } 2876 2877 // buffer is full -- sort & flush it 2878 if(LOG.isDebugEnabled()) { 2879 LOG.debug("flushing segment " + segments); 2880 } 2881 rawBuffer = rawKeys.getData(); 2882 sort(count); 2883 // indicate we're making progress 2884 if (progressable != null) { 2885 progressable.progress(); 2886 } 2887 flush(count, bytesProcessed, compressionType, codec, 2888 segments==0 && atEof); 2889 segments++; 2890 } 2891 return segments; 2892 } 2893 2894 public void close() throws IOException { 2895 if (in != null) { 2896 in.close(); 2897 } 2898 if (out != null) { 2899 out.close(); 2900 } 2901 if (indexOut != null) { 2902 indexOut.close(); 2903 } 2904 } 2905 2906 private void grow() { 2907 int newLength = keyOffsets.length * 3 / 2; 2908 keyOffsets = grow(keyOffsets, newLength); 2909 pointers = grow(pointers, newLength); 2910 pointersCopy = new int[newLength]; 2911 keyLengths = grow(keyLengths, newLength); 2912 rawValues = grow(rawValues, newLength); 2913 } 2914 2915 private int[] grow(int[] old, int newLength) { 2916 int[] result = new int[newLength]; 2917 System.arraycopy(old, 0, result, 0, old.length); 2918 return result; 2919 } 2920 2921 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2922 ValueBytes[] result = new ValueBytes[newLength]; 2923 System.arraycopy(old, 0, result, 0, old.length); 2924 for (int i=old.length; i < newLength; ++i) { 2925 result[i] = null; 2926 } 2927 return result; 2928 } 2929 2930 private void flush(int count, int bytesProcessed, 2931 CompressionType compressionType, 2932 CompressionCodec codec, 2933 boolean done) throws IOException { 2934 if (out == null) { 2935 outName = done ? outFile : outFile.suffix(".0"); 2936 out = fs.create(outName); 2937 if (!done) { 2938 indexOut = fs.create(outName.suffix(".index")); 2939 } 2940 } 2941 2942 long segmentStart = out.getPos(); 2943 Writer writer = createWriter(conf, Writer.stream(out), 2944 Writer.keyClass(keyClass), Writer.valueClass(valClass), 2945 Writer.compression(compressionType, codec), 2946 Writer.metadata(done ? metadata : new Metadata())); 2947 2948 if (!done) { 2949 writer.sync = null; // disable sync on temp files 2950 } 2951 2952 for (int i = 0; i < count; i++) { // write in sorted order 2953 int p = pointers[i]; 2954 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 2955 } 2956 writer.close(); 2957 2958 if (!done) { 2959 // Save the segment length 2960 WritableUtils.writeVLong(indexOut, segmentStart); 2961 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 2962 indexOut.flush(); 2963 } 2964 } 2965 2966 private void sort(int count) { 2967 System.arraycopy(pointers, 0, pointersCopy, 0, count); 2968 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 2969 } 2970 class SeqFileComparator implements Comparator<IntWritable> { 2971 @Override 2972 public int compare(IntWritable I, IntWritable J) { 2973 return comparator.compare(rawBuffer, keyOffsets[I.get()], 2974 keyLengths[I.get()], rawBuffer, 2975 keyOffsets[J.get()], keyLengths[J.get()]); 2976 } 2977 } 2978 2979 /** set the progressable object in order to report progress */ 2980 public void setProgressable(Progressable progressable) 2981 { 2982 this.progressable = progressable; 2983 } 2984 2985 } // SequenceFile.Sorter.SortPass 2986 2987 /** The interface to iterate over raw keys/values of SequenceFiles. */ 2988 public static interface RawKeyValueIterator { 2989 /** Gets the current raw key 2990 * @return DataOutputBuffer 2991 * @throws IOException 2992 */ 2993 DataOutputBuffer getKey() throws IOException; 2994 /** Gets the current raw value 2995 * @return ValueBytes 2996 * @throws IOException 2997 */ 2998 ValueBytes getValue() throws IOException; 2999 /** Sets up the current key and value (for getKey and getValue) 3000 * @return true if there exists a key/value, false otherwise 3001 * @throws IOException 3002 */ 3003 boolean next() throws IOException; 3004 /** closes the iterator so that the underlying streams can be closed 3005 * @throws IOException 3006 */ 3007 void close() throws IOException; 3008 /** Gets the Progress object; this has a float (0.0 - 1.0) 3009 * indicating the bytes processed by the iterator so far 3010 */ 3011 Progress getProgress(); 3012 } 3013 3014 /** 3015 * Merges the list of segments of type <code>SegmentDescriptor</code> 3016 * @param segments the list of SegmentDescriptors 3017 * @param tmpDir the directory to write temporary files into 3018 * @return RawKeyValueIterator 3019 * @throws IOException 3020 */ 3021 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3022 Path tmpDir) 3023 throws IOException { 3024 // pass in object to report progress, if present 3025 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3026 return mQueue.merge(); 3027 } 3028 3029 /** 3030 * Merges the contents of files passed in Path[] using a max factor value 3031 * that is already set 3032 * @param inNames the array of path names 3033 * @param deleteInputs true if the input files should be deleted when 3034 * unnecessary 3035 * @param tmpDir the directory to write temporary files into 3036 * @return RawKeyValueIteratorMergeQueue 3037 * @throws IOException 3038 */ 3039 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3040 Path tmpDir) 3041 throws IOException { 3042 return merge(inNames, deleteInputs, 3043 (inNames.length < factor) ? inNames.length : factor, 3044 tmpDir); 3045 } 3046 3047 /** 3048 * Merges the contents of files passed in Path[] 3049 * @param inNames the array of path names 3050 * @param deleteInputs true if the input files should be deleted when 3051 * unnecessary 3052 * @param factor the factor that will be used as the maximum merge fan-in 3053 * @param tmpDir the directory to write temporary files into 3054 * @return RawKeyValueIteratorMergeQueue 3055 * @throws IOException 3056 */ 3057 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3058 int factor, Path tmpDir) 3059 throws IOException { 3060 //get the segments from inNames 3061 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3062 for (int i = 0; i < inNames.length; i++) { 3063 SegmentDescriptor s = new SegmentDescriptor(0, 3064 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3065 s.preserveInput(!deleteInputs); 3066 s.doSync(); 3067 a.add(s); 3068 } 3069 this.factor = factor; 3070 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3071 return mQueue.merge(); 3072 } 3073 3074 /** 3075 * Merges the contents of files passed in Path[] 3076 * @param inNames the array of path names 3077 * @param tempDir the directory for creating temp files during merge 3078 * @param deleteInputs true if the input files should be deleted when 3079 * unnecessary 3080 * @return RawKeyValueIteratorMergeQueue 3081 * @throws IOException 3082 */ 3083 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3084 boolean deleteInputs) 3085 throws IOException { 3086 //outFile will basically be used as prefix for temp files for the 3087 //intermediate merge outputs 3088 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3089 //get the segments from inNames 3090 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3091 for (int i = 0; i < inNames.length; i++) { 3092 SegmentDescriptor s = new SegmentDescriptor(0, 3093 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3094 s.preserveInput(!deleteInputs); 3095 s.doSync(); 3096 a.add(s); 3097 } 3098 factor = (inNames.length < factor) ? inNames.length : factor; 3099 // pass in object to report progress, if present 3100 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3101 return mQueue.merge(); 3102 } 3103 3104 /** 3105 * Clones the attributes (like compression of the input file and creates a 3106 * corresponding Writer 3107 * @param inputFile the path of the input file whose attributes should be 3108 * cloned 3109 * @param outputFile the path of the output file 3110 * @param prog the Progressable to report status during the file write 3111 * @return Writer 3112 * @throws IOException 3113 */ 3114 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3115 Progressable prog) throws IOException { 3116 Reader reader = new Reader(conf, 3117 Reader.file(inputFile), 3118 new Reader.OnlyHeaderOption()); 3119 CompressionType compress = reader.getCompressionType(); 3120 CompressionCodec codec = reader.getCompressionCodec(); 3121 reader.close(); 3122 3123 Writer writer = createWriter(conf, 3124 Writer.file(outputFile), 3125 Writer.keyClass(keyClass), 3126 Writer.valueClass(valClass), 3127 Writer.compression(compress, codec), 3128 Writer.progressable(prog)); 3129 return writer; 3130 } 3131 3132 /** 3133 * Writes records from RawKeyValueIterator into a file represented by the 3134 * passed writer 3135 * @param records the RawKeyValueIterator 3136 * @param writer the Writer created earlier 3137 * @throws IOException 3138 */ 3139 public void writeFile(RawKeyValueIterator records, Writer writer) 3140 throws IOException { 3141 while(records.next()) { 3142 writer.appendRaw(records.getKey().getData(), 0, 3143 records.getKey().getLength(), records.getValue()); 3144 } 3145 writer.sync(); 3146 } 3147 3148 /** Merge the provided files. 3149 * @param inFiles the array of input path names 3150 * @param outFile the final output file 3151 * @throws IOException 3152 */ 3153 public void merge(Path[] inFiles, Path outFile) throws IOException { 3154 if (fs.exists(outFile)) { 3155 throw new IOException("already exists: " + outFile); 3156 } 3157 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3158 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3159 3160 writeFile(r, writer); 3161 3162 writer.close(); 3163 } 3164 3165 /** sort calls this to generate the final merged output */ 3166 private int mergePass(Path tmpDir) throws IOException { 3167 if(LOG.isDebugEnabled()) { 3168 LOG.debug("running merge pass"); 3169 } 3170 Writer writer = cloneFileAttributes( 3171 outFile.suffix(".0"), outFile, null); 3172 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3173 outFile.suffix(".0.index"), tmpDir); 3174 writeFile(r, writer); 3175 3176 writer.close(); 3177 return 0; 3178 } 3179 3180 /** Used by mergePass to merge the output of the sort 3181 * @param inName the name of the input file containing sorted segments 3182 * @param indexIn the offsets of the sorted segments 3183 * @param tmpDir the relative directory to store intermediate results in 3184 * @return RawKeyValueIterator 3185 * @throws IOException 3186 */ 3187 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3188 throws IOException { 3189 //get the segments from indexIn 3190 //we create a SegmentContainer so that we can track segments belonging to 3191 //inName and delete inName as soon as we see that we have looked at all 3192 //the contained segments during the merge process & hence don't need 3193 //them anymore 3194 SegmentContainer container = new SegmentContainer(inName, indexIn); 3195 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3196 return mQueue.merge(); 3197 } 3198 3199 /** This class implements the core of the merge logic */ 3200 private class MergeQueue extends PriorityQueue 3201 implements RawKeyValueIterator { 3202 private boolean compress; 3203 private boolean blockCompress; 3204 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3205 private ValueBytes rawValue; 3206 private long totalBytesProcessed; 3207 private float progPerByte; 3208 private Progress mergeProgress = new Progress(); 3209 private Path tmpDir; 3210 private Progressable progress = null; //handle to the progress reporting object 3211 private SegmentDescriptor minSegment; 3212 3213 //a TreeMap used to store the segments sorted by size (segment offset and 3214 //segment path name is used to break ties between segments of same sizes) 3215 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3216 new TreeMap<SegmentDescriptor, Void>(); 3217 3218 @SuppressWarnings("unchecked") 3219 public void put(SegmentDescriptor stream) throws IOException { 3220 if (size() == 0) { 3221 compress = stream.in.isCompressed(); 3222 blockCompress = stream.in.isBlockCompressed(); 3223 } else if (compress != stream.in.isCompressed() || 3224 blockCompress != stream.in.isBlockCompressed()) { 3225 throw new IOException("All merged files must be compressed or not."); 3226 } 3227 super.put(stream); 3228 } 3229 3230 /** 3231 * A queue of file segments to merge 3232 * @param segments the file segments to merge 3233 * @param tmpDir a relative local directory to save intermediate files in 3234 * @param progress the reference to the Progressable object 3235 */ 3236 public MergeQueue(List <SegmentDescriptor> segments, 3237 Path tmpDir, Progressable progress) { 3238 int size = segments.size(); 3239 for (int i = 0; i < size; i++) { 3240 sortedSegmentSizes.put(segments.get(i), null); 3241 } 3242 this.tmpDir = tmpDir; 3243 this.progress = progress; 3244 } 3245 @Override 3246 protected boolean lessThan(Object a, Object b) { 3247 // indicate we're making progress 3248 if (progress != null) { 3249 progress.progress(); 3250 } 3251 SegmentDescriptor msa = (SegmentDescriptor)a; 3252 SegmentDescriptor msb = (SegmentDescriptor)b; 3253 return comparator.compare(msa.getKey().getData(), 0, 3254 msa.getKey().getLength(), msb.getKey().getData(), 0, 3255 msb.getKey().getLength()) < 0; 3256 } 3257 @Override 3258 public void close() throws IOException { 3259 SegmentDescriptor ms; // close inputs 3260 while ((ms = (SegmentDescriptor)pop()) != null) { 3261 ms.cleanup(); 3262 } 3263 minSegment = null; 3264 } 3265 @Override 3266 public DataOutputBuffer getKey() throws IOException { 3267 return rawKey; 3268 } 3269 @Override 3270 public ValueBytes getValue() throws IOException { 3271 return rawValue; 3272 } 3273 @Override 3274 public boolean next() throws IOException { 3275 if (size() == 0) 3276 return false; 3277 if (minSegment != null) { 3278 //minSegment is non-null for all invocations of next except the first 3279 //one. For the first invocation, the priority queue is ready for use 3280 //but for the subsequent invocations, first adjust the queue 3281 adjustPriorityQueue(minSegment); 3282 if (size() == 0) { 3283 minSegment = null; 3284 return false; 3285 } 3286 } 3287 minSegment = (SegmentDescriptor)top(); 3288 long startPos = minSegment.in.getPosition(); // Current position in stream 3289 //save the raw key reference 3290 rawKey = minSegment.getKey(); 3291 //load the raw value. Re-use the existing rawValue buffer 3292 if (rawValue == null) { 3293 rawValue = minSegment.in.createValueBytes(); 3294 } 3295 minSegment.nextRawValue(rawValue); 3296 long endPos = minSegment.in.getPosition(); // End position after reading value 3297 updateProgress(endPos - startPos); 3298 return true; 3299 } 3300 3301 @Override 3302 public Progress getProgress() { 3303 return mergeProgress; 3304 } 3305 3306 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3307 long startPos = ms.in.getPosition(); // Current position in stream 3308 boolean hasNext = ms.nextRawKey(); 3309 long endPos = ms.in.getPosition(); // End position after reading key 3310 updateProgress(endPos - startPos); 3311 if (hasNext) { 3312 adjustTop(); 3313 } else { 3314 pop(); 3315 ms.cleanup(); 3316 } 3317 } 3318 3319 private void updateProgress(long bytesProcessed) { 3320 totalBytesProcessed += bytesProcessed; 3321 if (progPerByte > 0) { 3322 mergeProgress.set(totalBytesProcessed * progPerByte); 3323 } 3324 } 3325 3326 /** This is the single level merge that is called multiple times 3327 * depending on the factor size and the number of segments 3328 * @return RawKeyValueIterator 3329 * @throws IOException 3330 */ 3331 public RawKeyValueIterator merge() throws IOException { 3332 //create the MergeStreams from the sorted map created in the constructor 3333 //and dump the final output to a file 3334 int numSegments = sortedSegmentSizes.size(); 3335 int origFactor = factor; 3336 int passNo = 1; 3337 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3338 do { 3339 //get the factor for this pass of merge 3340 factor = getPassFactor(passNo, numSegments); 3341 List<SegmentDescriptor> segmentsToMerge = 3342 new ArrayList<SegmentDescriptor>(); 3343 int segmentsConsidered = 0; 3344 int numSegmentsToConsider = factor; 3345 while (true) { 3346 //extract the smallest 'factor' number of segment pointers from the 3347 //TreeMap. Call cleanup on the empty segments (no key/value data) 3348 SegmentDescriptor[] mStream = 3349 getSegmentDescriptors(numSegmentsToConsider); 3350 for (int i = 0; i < mStream.length; i++) { 3351 if (mStream[i].nextRawKey()) { 3352 segmentsToMerge.add(mStream[i]); 3353 segmentsConsidered++; 3354 // Count the fact that we read some bytes in calling nextRawKey() 3355 updateProgress(mStream[i].in.getPosition()); 3356 } 3357 else { 3358 mStream[i].cleanup(); 3359 numSegments--; //we ignore this segment for the merge 3360 } 3361 } 3362 //if we have the desired number of segments 3363 //or looked at all available segments, we break 3364 if (segmentsConsidered == factor || 3365 sortedSegmentSizes.size() == 0) { 3366 break; 3367 } 3368 3369 numSegmentsToConsider = factor - segmentsConsidered; 3370 } 3371 //feed the streams to the priority queue 3372 initialize(segmentsToMerge.size()); clear(); 3373 for (int i = 0; i < segmentsToMerge.size(); i++) { 3374 put(segmentsToMerge.get(i)); 3375 } 3376 //if we have lesser number of segments remaining, then just return the 3377 //iterator, else do another single level merge 3378 if (numSegments <= factor) { 3379 //calculate the length of the remaining segments. Required for 3380 //calculating the merge progress 3381 long totalBytes = 0; 3382 for (int i = 0; i < segmentsToMerge.size(); i++) { 3383 totalBytes += segmentsToMerge.get(i).segmentLength; 3384 } 3385 if (totalBytes != 0) //being paranoid 3386 progPerByte = 1.0f / (float)totalBytes; 3387 //reset factor to what it originally was 3388 factor = origFactor; 3389 return this; 3390 } else { 3391 //we want to spread the creation of temp files on multiple disks if 3392 //available under the space constraints 3393 long approxOutputSize = 0; 3394 for (SegmentDescriptor s : segmentsToMerge) { 3395 approxOutputSize += s.segmentLength + 3396 ChecksumFileSystem.getApproxChkSumLength( 3397 s.segmentLength); 3398 } 3399 Path tmpFilename = 3400 new Path(tmpDir, "intermediate").suffix("." + passNo); 3401 3402 Path outputFile = lDirAlloc.getLocalPathForWrite( 3403 tmpFilename.toString(), 3404 approxOutputSize, conf); 3405 if(LOG.isDebugEnabled()) { 3406 LOG.debug("writing intermediate results to " + outputFile); 3407 } 3408 Writer writer = cloneFileAttributes( 3409 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3410 fs.makeQualified(outputFile), null); 3411 writer.sync = null; //disable sync for temp files 3412 writeFile(this, writer); 3413 writer.close(); 3414 3415 //we finished one single level merge; now clean up the priority 3416 //queue 3417 this.close(); 3418 3419 SegmentDescriptor tempSegment = 3420 new SegmentDescriptor(0, 3421 fs.getFileStatus(outputFile).getLen(), outputFile); 3422 //put the segment back in the TreeMap 3423 sortedSegmentSizes.put(tempSegment, null); 3424 numSegments = sortedSegmentSizes.size(); 3425 passNo++; 3426 } 3427 //we are worried about only the first pass merge factor. So reset the 3428 //factor to what it originally was 3429 factor = origFactor; 3430 } while(true); 3431 } 3432 3433 //Hadoop-591 3434 public int getPassFactor(int passNo, int numSegments) { 3435 if (passNo > 1 || numSegments <= factor || factor == 1) 3436 return factor; 3437 int mod = (numSegments - 1) % (factor - 1); 3438 if (mod == 0) 3439 return factor; 3440 return mod + 1; 3441 } 3442 3443 /** Return (& remove) the requested number of segment descriptors from the 3444 * sorted map. 3445 */ 3446 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3447 if (numDescriptors > sortedSegmentSizes.size()) 3448 numDescriptors = sortedSegmentSizes.size(); 3449 SegmentDescriptor[] SegmentDescriptors = 3450 new SegmentDescriptor[numDescriptors]; 3451 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3452 int i = 0; 3453 while (i < numDescriptors) { 3454 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3455 iter.remove(); 3456 } 3457 return SegmentDescriptors; 3458 } 3459 } // SequenceFile.Sorter.MergeQueue 3460 3461 /** This class defines a merge segment. This class can be subclassed to 3462 * provide a customized cleanup method implementation. In this 3463 * implementation, cleanup closes the file handle and deletes the file 3464 */ 3465 public class SegmentDescriptor implements Comparable { 3466 3467 long segmentOffset; //the start of the segment in the file 3468 long segmentLength; //the length of the segment 3469 Path segmentPathName; //the path name of the file containing the segment 3470 boolean ignoreSync = true; //set to true for temp files 3471 private Reader in = null; 3472 private DataOutputBuffer rawKey = null; //this will hold the current key 3473 private boolean preserveInput = false; //delete input segment files? 3474 3475 /** Constructs a segment 3476 * @param segmentOffset the offset of the segment in the file 3477 * @param segmentLength the length of the segment 3478 * @param segmentPathName the path name of the file containing the segment 3479 */ 3480 public SegmentDescriptor (long segmentOffset, long segmentLength, 3481 Path segmentPathName) { 3482 this.segmentOffset = segmentOffset; 3483 this.segmentLength = segmentLength; 3484 this.segmentPathName = segmentPathName; 3485 } 3486 3487 /** Do the sync checks */ 3488 public void doSync() {ignoreSync = false;} 3489 3490 /** Whether to delete the files when no longer needed */ 3491 public void preserveInput(boolean preserve) { 3492 preserveInput = preserve; 3493 } 3494 3495 public boolean shouldPreserveInput() { 3496 return preserveInput; 3497 } 3498 3499 @Override 3500 public int compareTo(Object o) { 3501 SegmentDescriptor that = (SegmentDescriptor)o; 3502 if (this.segmentLength != that.segmentLength) { 3503 return (this.segmentLength < that.segmentLength ? -1 : 1); 3504 } 3505 if (this.segmentOffset != that.segmentOffset) { 3506 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3507 } 3508 return (this.segmentPathName.toString()). 3509 compareTo(that.segmentPathName.toString()); 3510 } 3511 3512 @Override 3513 public boolean equals(Object o) { 3514 if (!(o instanceof SegmentDescriptor)) { 3515 return false; 3516 } 3517 SegmentDescriptor that = (SegmentDescriptor)o; 3518 if (this.segmentLength == that.segmentLength && 3519 this.segmentOffset == that.segmentOffset && 3520 this.segmentPathName.toString().equals( 3521 that.segmentPathName.toString())) { 3522 return true; 3523 } 3524 return false; 3525 } 3526 3527 @Override 3528 public int hashCode() { 3529 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3530 } 3531 3532 /** Fills up the rawKey object with the key returned by the Reader 3533 * @return true if there is a key returned; false, otherwise 3534 * @throws IOException 3535 */ 3536 public boolean nextRawKey() throws IOException { 3537 if (in == null) { 3538 int bufferSize = getBufferSize(conf); 3539 Reader reader = new Reader(conf, 3540 Reader.file(segmentPathName), 3541 Reader.bufferSize(bufferSize), 3542 Reader.start(segmentOffset), 3543 Reader.length(segmentLength)); 3544 3545 //sometimes we ignore syncs especially for temp merge files 3546 if (ignoreSync) reader.ignoreSync(); 3547 3548 if (reader.getKeyClass() != keyClass) 3549 throw new IOException("wrong key class: " + reader.getKeyClass() + 3550 " is not " + keyClass); 3551 if (reader.getValueClass() != valClass) 3552 throw new IOException("wrong value class: "+reader.getValueClass()+ 3553 " is not " + valClass); 3554 this.in = reader; 3555 rawKey = new DataOutputBuffer(); 3556 } 3557 rawKey.reset(); 3558 int keyLength = 3559 in.nextRawKey(rawKey); 3560 return (keyLength >= 0); 3561 } 3562 3563 /** Fills up the passed rawValue with the value corresponding to the key 3564 * read earlier 3565 * @param rawValue 3566 * @return the length of the value 3567 * @throws IOException 3568 */ 3569 public int nextRawValue(ValueBytes rawValue) throws IOException { 3570 int valLength = in.nextRawValue(rawValue); 3571 return valLength; 3572 } 3573 3574 /** Returns the stored rawKey */ 3575 public DataOutputBuffer getKey() { 3576 return rawKey; 3577 } 3578 3579 /** closes the underlying reader */ 3580 private void close() throws IOException { 3581 this.in.close(); 3582 this.in = null; 3583 } 3584 3585 /** The default cleanup. Subclasses can override this with a custom 3586 * cleanup 3587 */ 3588 public void cleanup() throws IOException { 3589 close(); 3590 if (!preserveInput) { 3591 fs.delete(segmentPathName, true); 3592 } 3593 } 3594 } // SequenceFile.Sorter.SegmentDescriptor 3595 3596 /** This class provisions multiple segments contained within a single 3597 * file 3598 */ 3599 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3600 3601 SegmentContainer parentContainer = null; 3602 3603 /** Constructs a segment 3604 * @param segmentOffset the offset of the segment in the file 3605 * @param segmentLength the length of the segment 3606 * @param segmentPathName the path name of the file containing the segment 3607 * @param parent the parent SegmentContainer that holds the segment 3608 */ 3609 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3610 Path segmentPathName, SegmentContainer parent) { 3611 super(segmentOffset, segmentLength, segmentPathName); 3612 this.parentContainer = parent; 3613 } 3614 /** The default cleanup. Subclasses can override this with a custom 3615 * cleanup 3616 */ 3617 @Override 3618 public void cleanup() throws IOException { 3619 super.close(); 3620 if (super.shouldPreserveInput()) return; 3621 parentContainer.cleanup(); 3622 } 3623 3624 @Override 3625 public boolean equals(Object o) { 3626 if (!(o instanceof LinkedSegmentsDescriptor)) { 3627 return false; 3628 } 3629 return super.equals(o); 3630 } 3631 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3632 3633 /** The class that defines a container for segments to be merged. Primarily 3634 * required to delete temp files as soon as all the contained segments 3635 * have been looked at */ 3636 private class SegmentContainer { 3637 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3638 private int numSegmentsContained; //# of segments contained 3639 private Path inName; //input file from where segments are created 3640 3641 //the list of segments read from the file 3642 private ArrayList <SegmentDescriptor> segments = 3643 new ArrayList <SegmentDescriptor>(); 3644 /** This constructor is there primarily to serve the sort routine that 3645 * generates a single output file with an associated index file */ 3646 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3647 //get the segments from indexIn 3648 FSDataInputStream fsIndexIn = fs.open(indexIn); 3649 long end = fs.getFileStatus(indexIn).getLen(); 3650 while (fsIndexIn.getPos() < end) { 3651 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3652 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3653 Path segmentName = inName; 3654 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3655 segmentLength, segmentName, this)); 3656 } 3657 fsIndexIn.close(); 3658 fs.delete(indexIn, true); 3659 numSegmentsContained = segments.size(); 3660 this.inName = inName; 3661 } 3662 3663 public List <SegmentDescriptor> getSegmentList() { 3664 return segments; 3665 } 3666 public void cleanup() throws IOException { 3667 numSegmentsCleanedUp++; 3668 if (numSegmentsCleanedUp == numSegmentsContained) { 3669 fs.delete(inName, true); 3670 } 3671 } 3672 } //SequenceFile.Sorter.SegmentContainer 3673 3674 } // SequenceFile.Sorter 3675 3676 } // SequenceFile