001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025import org.apache.commons.logging.*; 026import org.apache.hadoop.util.Options; 027import org.apache.hadoop.fs.*; 028import org.apache.hadoop.fs.Options.CreateOpts; 029import org.apache.hadoop.io.compress.CodecPool; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionInputStream; 032import org.apache.hadoop.io.compress.CompressionOutputStream; 033import org.apache.hadoop.io.compress.Compressor; 034import org.apache.hadoop.io.compress.Decompressor; 035import org.apache.hadoop.io.compress.DefaultCodec; 036import org.apache.hadoop.io.compress.GzipCodec; 037import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038import org.apache.hadoop.io.serializer.Deserializer; 039import org.apache.hadoop.io.serializer.Serializer; 040import org.apache.hadoop.io.serializer.SerializationFactory; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.*; 044import org.apache.hadoop.util.Progressable; 045import org.apache.hadoop.util.Progress; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.util.NativeCodeLoader; 048import org.apache.hadoop.util.MergeSort; 049import org.apache.hadoop.util.PriorityQueue; 050import org.apache.hadoop.util.Time; 051 052/** 053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 054 * pairs. 055 * 056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 058 * reading and sorting respectively.</p> 059 * 060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 061 * {@link CompressionType} used to compress key/value pairs: 062 * <ol> 063 * <li> 064 * <code>Writer</code> : Uncompressed records. 065 * </li> 066 * <li> 067 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 068 * values. 069 * </li> 070 * <li> 071 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 072 * values are collected in 'blocks' 073 * separately and compressed. The size of 074 * the 'block' is configurable. 075 * </ol> 076 * 077 * <p>The actual compression algorithm used to compress key and/or values can be 078 * specified by using the appropriate {@link CompressionCodec}.</p> 079 * 080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 082 * 083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 084 * above <code>SequenceFile</code> formats.</p> 085 * 086 * <h4 id="Formats">SequenceFile Formats</h4> 087 * 088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 089 * depending on the <code>CompressionType</code> specified. All of them share a 090 * <a href="#Header">common header</a> described below. 091 * 092 * <h5 id="Header">SequenceFile Header</h5> 093 * <ul> 094 * <li> 095 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 096 * version number (e.g. SEQ4 or SEQ6) 097 * </li> 098 * <li> 099 * keyClassName -key class 100 * </li> 101 * <li> 102 * valueClassName - value class 103 * </li> 104 * <li> 105 * compression - A boolean which specifies if compression is turned on for 106 * keys/values in this file. 107 * </li> 108 * <li> 109 * blockCompression - A boolean which specifies if block-compression is 110 * turned on for keys/values in this file. 111 * </li> 112 * <li> 113 * compression codec - <code>CompressionCodec</code> class which is used for 114 * compression of keys and/or values (if compression is 115 * enabled). 116 * </li> 117 * <li> 118 * metadata - {@link Metadata} for this file. 119 * </li> 120 * <li> 121 * sync - A sync marker to denote end of the header. 122 * </li> 123 * </ul> 124 * 125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 126 * <ul> 127 * <li> 128 * <a href="#Header">Header</a> 129 * </li> 130 * <li> 131 * Record 132 * <ul> 133 * <li>Record length</li> 134 * <li>Key length</li> 135 * <li>Key</li> 136 * <li>Value</li> 137 * </ul> 138 * </li> 139 * <li> 140 * A sync-marker every few <code>100</code> bytes or so. 141 * </li> 142 * </ul> 143 * 144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 145 * <ul> 146 * <li> 147 * <a href="#Header">Header</a> 148 * </li> 149 * <li> 150 * Record 151 * <ul> 152 * <li>Record length</li> 153 * <li>Key length</li> 154 * <li>Key</li> 155 * <li><i>Compressed</i> Value</li> 156 * </ul> 157 * </li> 158 * <li> 159 * A sync-marker every few <code>100</code> bytes or so. 160 * </li> 161 * </ul> 162 * 163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 164 * <ul> 165 * <li> 166 * <a href="#Header">Header</a> 167 * </li> 168 * <li> 169 * Record <i>Block</i> 170 * <ul> 171 * <li>Uncompressed number of records in the block</li> 172 * <li>Compressed key-lengths block-size</li> 173 * <li>Compressed key-lengths block</li> 174 * <li>Compressed keys block-size</li> 175 * <li>Compressed keys block</li> 176 * <li>Compressed value-lengths block-size</li> 177 * <li>Compressed value-lengths block</li> 178 * <li>Compressed values block-size</li> 179 * <li>Compressed values block</li> 180 * </ul> 181 * </li> 182 * <li> 183 * A sync-marker every block. 184 * </li> 185 * </ul> 186 * 187 * <p>The compressed blocks of key lengths and value lengths consist of the 188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 189 * format.</p> 190 * 191 * @see CompressionCodec 192 */ 193@InterfaceAudience.Public 194@InterfaceStability.Stable 195public class SequenceFile { 196 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 197 198 private SequenceFile() {} // no public ctor 199 200 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 201 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 202 private static final byte VERSION_WITH_METADATA = (byte)6; 203 private static byte[] VERSION = new byte[] { 204 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 205 }; 206 207 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 208 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 209 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 210 211 /** The number of bytes between sync points.*/ 212 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 213 214 /** 215 * The compression type used to compress key/value pairs in the 216 * {@link SequenceFile}. 217 * 218 * @see SequenceFile.Writer 219 */ 220 public static enum CompressionType { 221 /** Do not compress records. */ 222 NONE, 223 /** Compress values only, each separately. */ 224 RECORD, 225 /** Compress sequences of records together in blocks. */ 226 BLOCK 227 } 228 229 /** 230 * Get the compression type for the reduce outputs 231 * @param job the job config to look in 232 * @return the kind of compression to use 233 */ 234 static public CompressionType getDefaultCompressionType(Configuration job) { 235 String name = job.get("io.seqfile.compression.type"); 236 return name == null ? CompressionType.RECORD : 237 CompressionType.valueOf(name); 238 } 239 240 /** 241 * Set the default compression type for sequence files. 242 * @param job the configuration to modify 243 * @param val the new compression type (none, block, record) 244 */ 245 static public void setDefaultCompressionType(Configuration job, 246 CompressionType val) { 247 job.set("io.seqfile.compression.type", val.toString()); 248 } 249 250 /** 251 * Create a new Writer with the given options. 252 * @param conf the configuration to use 253 * @param opts the options to create the file with 254 * @return a new Writer 255 * @throws IOException 256 */ 257 public static Writer createWriter(Configuration conf, Writer.Option... opts 258 ) throws IOException { 259 Writer.CompressionOption compressionOption = 260 Options.getOption(Writer.CompressionOption.class, opts); 261 CompressionType kind; 262 if (compressionOption != null) { 263 kind = compressionOption.getValue(); 264 } else { 265 kind = getDefaultCompressionType(conf); 266 opts = Options.prependOptions(opts, Writer.compression(kind)); 267 } 268 switch (kind) { 269 default: 270 case NONE: 271 return new Writer(conf, opts); 272 case RECORD: 273 return new RecordCompressWriter(conf, opts); 274 case BLOCK: 275 return new BlockCompressWriter(conf, opts); 276 } 277 } 278 279 /** 280 * Construct the preferred type of SequenceFile Writer. 281 * @param fs The configured filesystem. 282 * @param conf The configuration. 283 * @param name The name of the file. 284 * @param keyClass The 'key' type. 285 * @param valClass The 'value' type. 286 * @return Returns the handle to the constructed SequenceFile Writer. 287 * @throws IOException 288 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 289 * instead. 290 */ 291 @Deprecated 292 public static Writer 293 createWriter(FileSystem fs, Configuration conf, Path name, 294 Class keyClass, Class valClass) throws IOException { 295 return createWriter(conf, Writer.filesystem(fs), 296 Writer.file(name), Writer.keyClass(keyClass), 297 Writer.valueClass(valClass)); 298 } 299 300 /** 301 * Construct the preferred type of SequenceFile Writer. 302 * @param fs The configured filesystem. 303 * @param conf The configuration. 304 * @param name The name of the file. 305 * @param keyClass The 'key' type. 306 * @param valClass The 'value' type. 307 * @param compressionType The compression type. 308 * @return Returns the handle to the constructed SequenceFile Writer. 309 * @throws IOException 310 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 311 * instead. 312 */ 313 @Deprecated 314 public static Writer 315 createWriter(FileSystem fs, Configuration conf, Path name, 316 Class keyClass, Class valClass, 317 CompressionType compressionType) throws IOException { 318 return createWriter(conf, Writer.filesystem(fs), 319 Writer.file(name), Writer.keyClass(keyClass), 320 Writer.valueClass(valClass), 321 Writer.compression(compressionType)); 322 } 323 324 /** 325 * Construct the preferred type of SequenceFile Writer. 326 * @param fs The configured filesystem. 327 * @param conf The configuration. 328 * @param name The name of the file. 329 * @param keyClass The 'key' type. 330 * @param valClass The 'value' type. 331 * @param compressionType The compression type. 332 * @param progress The Progressable object to track progress. 333 * @return Returns the handle to the constructed SequenceFile Writer. 334 * @throws IOException 335 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 336 * instead. 337 */ 338 @Deprecated 339 public static Writer 340 createWriter(FileSystem fs, Configuration conf, Path name, 341 Class keyClass, Class valClass, CompressionType compressionType, 342 Progressable progress) throws IOException { 343 return createWriter(conf, Writer.file(name), 344 Writer.filesystem(fs), 345 Writer.keyClass(keyClass), 346 Writer.valueClass(valClass), 347 Writer.compression(compressionType), 348 Writer.progressable(progress)); 349 } 350 351 /** 352 * Construct the preferred type of SequenceFile Writer. 353 * @param fs The configured filesystem. 354 * @param conf The configuration. 355 * @param name The name of the file. 356 * @param keyClass The 'key' type. 357 * @param valClass The 'value' type. 358 * @param compressionType The compression type. 359 * @param codec The compression codec. 360 * @return Returns the handle to the constructed SequenceFile Writer. 361 * @throws IOException 362 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 363 * instead. 364 */ 365 @Deprecated 366 public static Writer 367 createWriter(FileSystem fs, Configuration conf, Path name, 368 Class keyClass, Class valClass, CompressionType compressionType, 369 CompressionCodec codec) throws IOException { 370 return createWriter(conf, Writer.file(name), 371 Writer.filesystem(fs), 372 Writer.keyClass(keyClass), 373 Writer.valueClass(valClass), 374 Writer.compression(compressionType, codec)); 375 } 376 377 /** 378 * Construct the preferred type of SequenceFile Writer. 379 * @param fs The configured filesystem. 380 * @param conf The configuration. 381 * @param name The name of the file. 382 * @param keyClass The 'key' type. 383 * @param valClass The 'value' type. 384 * @param compressionType The compression type. 385 * @param codec The compression codec. 386 * @param progress The Progressable object to track progress. 387 * @param metadata The metadata of the file. 388 * @return Returns the handle to the constructed SequenceFile Writer. 389 * @throws IOException 390 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 391 * instead. 392 */ 393 @Deprecated 394 public static Writer 395 createWriter(FileSystem fs, Configuration conf, Path name, 396 Class keyClass, Class valClass, 397 CompressionType compressionType, CompressionCodec codec, 398 Progressable progress, Metadata metadata) throws IOException { 399 return createWriter(conf, Writer.file(name), 400 Writer.filesystem(fs), 401 Writer.keyClass(keyClass), 402 Writer.valueClass(valClass), 403 Writer.compression(compressionType, codec), 404 Writer.progressable(progress), 405 Writer.metadata(metadata)); 406 } 407 408 /** 409 * Construct the preferred type of SequenceFile Writer. 410 * @param fs The configured filesystem. 411 * @param conf The configuration. 412 * @param name The name of the file. 413 * @param keyClass The 'key' type. 414 * @param valClass The 'value' type. 415 * @param bufferSize buffer size for the underlaying outputstream. 416 * @param replication replication factor for the file. 417 * @param blockSize block size for the file. 418 * @param compressionType The compression type. 419 * @param codec The compression codec. 420 * @param progress The Progressable object to track progress. 421 * @param metadata The metadata of the file. 422 * @return Returns the handle to the constructed SequenceFile Writer. 423 * @throws IOException 424 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 425 * instead. 426 */ 427 @Deprecated 428 public static Writer 429 createWriter(FileSystem fs, Configuration conf, Path name, 430 Class keyClass, Class valClass, int bufferSize, 431 short replication, long blockSize, 432 CompressionType compressionType, CompressionCodec codec, 433 Progressable progress, Metadata metadata) throws IOException { 434 return createWriter(conf, Writer.file(name), 435 Writer.filesystem(fs), 436 Writer.keyClass(keyClass), 437 Writer.valueClass(valClass), 438 Writer.bufferSize(bufferSize), 439 Writer.replication(replication), 440 Writer.blockSize(blockSize), 441 Writer.compression(compressionType, codec), 442 Writer.progressable(progress), 443 Writer.metadata(metadata)); 444 } 445 446 /** 447 * Construct the preferred type of SequenceFile Writer. 448 * @param fs The configured filesystem. 449 * @param conf The configuration. 450 * @param name The name of the file. 451 * @param keyClass The 'key' type. 452 * @param valClass The 'value' type. 453 * @param bufferSize buffer size for the underlaying outputstream. 454 * @param replication replication factor for the file. 455 * @param blockSize block size for the file. 456 * @param createParent create parent directory if non-existent 457 * @param compressionType The compression type. 458 * @param codec The compression codec. 459 * @param metadata The metadata of the file. 460 * @return Returns the handle to the constructed SequenceFile Writer. 461 * @throws IOException 462 */ 463 @Deprecated 464 public static Writer 465 createWriter(FileSystem fs, Configuration conf, Path name, 466 Class keyClass, Class valClass, int bufferSize, 467 short replication, long blockSize, boolean createParent, 468 CompressionType compressionType, CompressionCodec codec, 469 Metadata metadata) throws IOException { 470 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 471 conf, name, keyClass, valClass, compressionType, codec, 472 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 473 CreateOpts.bufferSize(bufferSize), 474 createParent ? CreateOpts.createParent() 475 : CreateOpts.donotCreateParent(), 476 CreateOpts.repFac(replication), 477 CreateOpts.blockSize(blockSize) 478 ); 479 } 480 481 /** 482 * Construct the preferred type of SequenceFile Writer. 483 * @param fc The context for the specified file. 484 * @param conf The configuration. 485 * @param name The name of the file. 486 * @param keyClass The 'key' type. 487 * @param valClass The 'value' type. 488 * @param compressionType The compression type. 489 * @param codec The compression codec. 490 * @param metadata The metadata of the file. 491 * @param createFlag gives the semantics of create: overwrite, append etc. 492 * @param opts file creation options; see {@link CreateOpts}. 493 * @return Returns the handle to the constructed SequenceFile Writer. 494 * @throws IOException 495 */ 496 public static Writer 497 createWriter(FileContext fc, Configuration conf, Path name, 498 Class keyClass, Class valClass, 499 CompressionType compressionType, CompressionCodec codec, 500 Metadata metadata, 501 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 502 throws IOException { 503 return createWriter(conf, fc.create(name, createFlag, opts), 504 keyClass, valClass, compressionType, codec, metadata).ownStream(); 505 } 506 507 /** 508 * Construct the preferred type of SequenceFile Writer. 509 * @param fs The configured filesystem. 510 * @param conf The configuration. 511 * @param name The name of the file. 512 * @param keyClass The 'key' type. 513 * @param valClass The 'value' type. 514 * @param compressionType The compression type. 515 * @param codec The compression codec. 516 * @param progress The Progressable object to track progress. 517 * @return Returns the handle to the constructed SequenceFile Writer. 518 * @throws IOException 519 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 520 * instead. 521 */ 522 @Deprecated 523 public static Writer 524 createWriter(FileSystem fs, Configuration conf, Path name, 525 Class keyClass, Class valClass, 526 CompressionType compressionType, CompressionCodec codec, 527 Progressable progress) throws IOException { 528 return createWriter(conf, Writer.file(name), 529 Writer.filesystem(fs), 530 Writer.keyClass(keyClass), 531 Writer.valueClass(valClass), 532 Writer.compression(compressionType, codec), 533 Writer.progressable(progress)); 534 } 535 536 /** 537 * Construct the preferred type of 'raw' SequenceFile Writer. 538 * @param conf The configuration. 539 * @param out The stream on top which the writer is to be constructed. 540 * @param keyClass The 'key' type. 541 * @param valClass The 'value' type. 542 * @param compressionType The compression type. 543 * @param codec The compression codec. 544 * @param metadata The metadata of the file. 545 * @return Returns the handle to the constructed SequenceFile Writer. 546 * @throws IOException 547 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 548 * instead. 549 */ 550 @Deprecated 551 public static Writer 552 createWriter(Configuration conf, FSDataOutputStream out, 553 Class keyClass, Class valClass, 554 CompressionType compressionType, 555 CompressionCodec codec, Metadata metadata) throws IOException { 556 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 557 Writer.valueClass(valClass), 558 Writer.compression(compressionType, codec), 559 Writer.metadata(metadata)); 560 } 561 562 /** 563 * Construct the preferred type of 'raw' SequenceFile Writer. 564 * @param conf The configuration. 565 * @param out The stream on top which the writer is to be constructed. 566 * @param keyClass The 'key' type. 567 * @param valClass The 'value' type. 568 * @param compressionType The compression type. 569 * @param codec The compression codec. 570 * @return Returns the handle to the constructed SequenceFile Writer. 571 * @throws IOException 572 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 573 * instead. 574 */ 575 @Deprecated 576 public static Writer 577 createWriter(Configuration conf, FSDataOutputStream out, 578 Class keyClass, Class valClass, CompressionType compressionType, 579 CompressionCodec codec) throws IOException { 580 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 581 Writer.valueClass(valClass), 582 Writer.compression(compressionType, codec)); 583 } 584 585 586 /** The interface to 'raw' values of SequenceFiles. */ 587 public static interface ValueBytes { 588 589 /** Writes the uncompressed bytes to the outStream. 590 * @param outStream : Stream to write uncompressed bytes into. 591 * @throws IOException 592 */ 593 public void writeUncompressedBytes(DataOutputStream outStream) 594 throws IOException; 595 596 /** Write compressed bytes to outStream. 597 * Note: that it will NOT compress the bytes if they are not compressed. 598 * @param outStream : Stream to write compressed bytes into. 599 */ 600 public void writeCompressedBytes(DataOutputStream outStream) 601 throws IllegalArgumentException, IOException; 602 603 /** 604 * Size of stored data. 605 */ 606 public int getSize(); 607 } 608 609 private static class UncompressedBytes implements ValueBytes { 610 private int dataSize; 611 private byte[] data; 612 613 private UncompressedBytes() { 614 data = null; 615 dataSize = 0; 616 } 617 618 private void reset(DataInputStream in, int length) throws IOException { 619 if (data == null) { 620 data = new byte[length]; 621 } else if (length > data.length) { 622 data = new byte[Math.max(length, data.length * 2)]; 623 } 624 dataSize = -1; 625 in.readFully(data, 0, length); 626 dataSize = length; 627 } 628 629 @Override 630 public int getSize() { 631 return dataSize; 632 } 633 634 @Override 635 public void writeUncompressedBytes(DataOutputStream outStream) 636 throws IOException { 637 outStream.write(data, 0, dataSize); 638 } 639 640 @Override 641 public void writeCompressedBytes(DataOutputStream outStream) 642 throws IllegalArgumentException, IOException { 643 throw 644 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 645 } 646 647 } // UncompressedBytes 648 649 private static class CompressedBytes implements ValueBytes { 650 private int dataSize; 651 private byte[] data; 652 DataInputBuffer rawData = null; 653 CompressionCodec codec = null; 654 CompressionInputStream decompressedStream = null; 655 656 private CompressedBytes(CompressionCodec codec) { 657 data = null; 658 dataSize = 0; 659 this.codec = codec; 660 } 661 662 private void reset(DataInputStream in, int length) throws IOException { 663 if (data == null) { 664 data = new byte[length]; 665 } else if (length > data.length) { 666 data = new byte[Math.max(length, data.length * 2)]; 667 } 668 dataSize = -1; 669 in.readFully(data, 0, length); 670 dataSize = length; 671 } 672 673 @Override 674 public int getSize() { 675 return dataSize; 676 } 677 678 @Override 679 public void writeUncompressedBytes(DataOutputStream outStream) 680 throws IOException { 681 if (decompressedStream == null) { 682 rawData = new DataInputBuffer(); 683 decompressedStream = codec.createInputStream(rawData); 684 } else { 685 decompressedStream.resetState(); 686 } 687 rawData.reset(data, 0, dataSize); 688 689 byte[] buffer = new byte[8192]; 690 int bytesRead = 0; 691 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 692 outStream.write(buffer, 0, bytesRead); 693 } 694 } 695 696 @Override 697 public void writeCompressedBytes(DataOutputStream outStream) 698 throws IllegalArgumentException, IOException { 699 outStream.write(data, 0, dataSize); 700 } 701 702 } // CompressedBytes 703 704 /** 705 * The class encapsulating with the metadata of a file. 706 * The metadata of a file is a list of attribute name/value 707 * pairs of Text type. 708 * 709 */ 710 public static class Metadata implements Writable { 711 712 private TreeMap<Text, Text> theMetadata; 713 714 public Metadata() { 715 this(new TreeMap<Text, Text>()); 716 } 717 718 public Metadata(TreeMap<Text, Text> arg) { 719 if (arg == null) { 720 this.theMetadata = new TreeMap<Text, Text>(); 721 } else { 722 this.theMetadata = arg; 723 } 724 } 725 726 public Text get(Text name) { 727 return this.theMetadata.get(name); 728 } 729 730 public void set(Text name, Text value) { 731 this.theMetadata.put(name, value); 732 } 733 734 public TreeMap<Text, Text> getMetadata() { 735 return new TreeMap<Text, Text>(this.theMetadata); 736 } 737 738 @Override 739 public void write(DataOutput out) throws IOException { 740 out.writeInt(this.theMetadata.size()); 741 Iterator<Map.Entry<Text, Text>> iter = 742 this.theMetadata.entrySet().iterator(); 743 while (iter.hasNext()) { 744 Map.Entry<Text, Text> en = iter.next(); 745 en.getKey().write(out); 746 en.getValue().write(out); 747 } 748 } 749 750 @Override 751 public void readFields(DataInput in) throws IOException { 752 int sz = in.readInt(); 753 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 754 this.theMetadata = new TreeMap<Text, Text>(); 755 for (int i = 0; i < sz; i++) { 756 Text key = new Text(); 757 Text val = new Text(); 758 key.readFields(in); 759 val.readFields(in); 760 this.theMetadata.put(key, val); 761 } 762 } 763 764 @Override 765 public boolean equals(Object other) { 766 if (other == null) { 767 return false; 768 } 769 if (other.getClass() != this.getClass()) { 770 return false; 771 } else { 772 return equals((Metadata)other); 773 } 774 } 775 776 public boolean equals(Metadata other) { 777 if (other == null) return false; 778 if (this.theMetadata.size() != other.theMetadata.size()) { 779 return false; 780 } 781 Iterator<Map.Entry<Text, Text>> iter1 = 782 this.theMetadata.entrySet().iterator(); 783 Iterator<Map.Entry<Text, Text>> iter2 = 784 other.theMetadata.entrySet().iterator(); 785 while (iter1.hasNext() && iter2.hasNext()) { 786 Map.Entry<Text, Text> en1 = iter1.next(); 787 Map.Entry<Text, Text> en2 = iter2.next(); 788 if (!en1.getKey().equals(en2.getKey())) { 789 return false; 790 } 791 if (!en1.getValue().equals(en2.getValue())) { 792 return false; 793 } 794 } 795 if (iter1.hasNext() || iter2.hasNext()) { 796 return false; 797 } 798 return true; 799 } 800 801 @Override 802 public int hashCode() { 803 assert false : "hashCode not designed"; 804 return 42; // any arbitrary constant will do 805 } 806 807 @Override 808 public String toString() { 809 StringBuilder sb = new StringBuilder(); 810 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 811 Iterator<Map.Entry<Text, Text>> iter = 812 this.theMetadata.entrySet().iterator(); 813 while (iter.hasNext()) { 814 Map.Entry<Text, Text> en = iter.next(); 815 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 816 sb.append("\n"); 817 } 818 return sb.toString(); 819 } 820 } 821 822 /** Write key/value pairs to a sequence-format file. */ 823 public static class Writer implements java.io.Closeable, Syncable { 824 private Configuration conf; 825 FSDataOutputStream out; 826 boolean ownOutputStream = true; 827 DataOutputBuffer buffer = new DataOutputBuffer(); 828 829 Class keyClass; 830 Class valClass; 831 832 private final CompressionType compress; 833 CompressionCodec codec = null; 834 CompressionOutputStream deflateFilter = null; 835 DataOutputStream deflateOut = null; 836 Metadata metadata = null; 837 Compressor compressor = null; 838 839 private boolean appendMode = false; 840 841 protected Serializer keySerializer; 842 protected Serializer uncompressedValSerializer; 843 protected Serializer compressedValSerializer; 844 845 // Insert a globally unique 16-byte value every few entries, so that one 846 // can seek into the middle of a file and then synchronize with record 847 // starts and ends by scanning for this value. 848 long lastSyncPos; // position of last sync 849 byte[] sync; // 16 random bytes 850 { 851 try { 852 MessageDigest digester = MessageDigest.getInstance("MD5"); 853 long time = Time.now(); 854 digester.update((new UID()+"@"+time).getBytes()); 855 sync = digester.digest(); 856 } catch (Exception e) { 857 throw new RuntimeException(e); 858 } 859 } 860 861 public static interface Option {} 862 863 static class FileOption extends Options.PathOption 864 implements Option { 865 FileOption(Path path) { 866 super(path); 867 } 868 } 869 870 /** 871 * @deprecated only used for backwards-compatibility in the createWriter methods 872 * that take FileSystem. 873 */ 874 @Deprecated 875 private static class FileSystemOption implements Option { 876 private final FileSystem value; 877 protected FileSystemOption(FileSystem value) { 878 this.value = value; 879 } 880 public FileSystem getValue() { 881 return value; 882 } 883 } 884 885 static class StreamOption extends Options.FSDataOutputStreamOption 886 implements Option { 887 StreamOption(FSDataOutputStream stream) { 888 super(stream); 889 } 890 } 891 892 static class BufferSizeOption extends Options.IntegerOption 893 implements Option { 894 BufferSizeOption(int value) { 895 super(value); 896 } 897 } 898 899 static class BlockSizeOption extends Options.LongOption implements Option { 900 BlockSizeOption(long value) { 901 super(value); 902 } 903 } 904 905 static class ReplicationOption extends Options.IntegerOption 906 implements Option { 907 ReplicationOption(int value) { 908 super(value); 909 } 910 } 911 912 static class AppendIfExistsOption extends Options.BooleanOption implements 913 Option { 914 AppendIfExistsOption(boolean value) { 915 super(value); 916 } 917 } 918 919 static class KeyClassOption extends Options.ClassOption implements Option { 920 KeyClassOption(Class<?> value) { 921 super(value); 922 } 923 } 924 925 static class ValueClassOption extends Options.ClassOption 926 implements Option { 927 ValueClassOption(Class<?> value) { 928 super(value); 929 } 930 } 931 932 static class MetadataOption implements Option { 933 private final Metadata value; 934 MetadataOption(Metadata value) { 935 this.value = value; 936 } 937 Metadata getValue() { 938 return value; 939 } 940 } 941 942 static class ProgressableOption extends Options.ProgressableOption 943 implements Option { 944 ProgressableOption(Progressable value) { 945 super(value); 946 } 947 } 948 949 private static class CompressionOption implements Option { 950 private final CompressionType value; 951 private final CompressionCodec codec; 952 CompressionOption(CompressionType value) { 953 this(value, null); 954 } 955 CompressionOption(CompressionType value, CompressionCodec codec) { 956 this.value = value; 957 this.codec = (CompressionType.NONE != value && null == codec) 958 ? new DefaultCodec() 959 : codec; 960 } 961 CompressionType getValue() { 962 return value; 963 } 964 CompressionCodec getCodec() { 965 return codec; 966 } 967 } 968 969 public static Option file(Path value) { 970 return new FileOption(value); 971 } 972 973 /** 974 * @deprecated only used for backwards-compatibility in the createWriter methods 975 * that take FileSystem. 976 */ 977 @Deprecated 978 private static Option filesystem(FileSystem fs) { 979 return new SequenceFile.Writer.FileSystemOption(fs); 980 } 981 982 public static Option bufferSize(int value) { 983 return new BufferSizeOption(value); 984 } 985 986 public static Option stream(FSDataOutputStream value) { 987 return new StreamOption(value); 988 } 989 990 public static Option replication(short value) { 991 return new ReplicationOption(value); 992 } 993 994 public static Option appendIfExists(boolean value) { 995 return new AppendIfExistsOption(value); 996 } 997 998 public static Option blockSize(long value) { 999 return new BlockSizeOption(value); 1000 } 1001 1002 public static Option progressable(Progressable value) { 1003 return new ProgressableOption(value); 1004 } 1005 1006 public static Option keyClass(Class<?> value) { 1007 return new KeyClassOption(value); 1008 } 1009 1010 public static Option valueClass(Class<?> value) { 1011 return new ValueClassOption(value); 1012 } 1013 1014 public static Option metadata(Metadata value) { 1015 return new MetadataOption(value); 1016 } 1017 1018 public static Option compression(CompressionType value) { 1019 return new CompressionOption(value); 1020 } 1021 1022 public static Option compression(CompressionType value, 1023 CompressionCodec codec) { 1024 return new CompressionOption(value, codec); 1025 } 1026 1027 /** 1028 * Construct a uncompressed writer from a set of options. 1029 * @param conf the configuration to use 1030 * @param options the options used when creating the writer 1031 * @throws IOException if it fails 1032 */ 1033 Writer(Configuration conf, 1034 Option... opts) throws IOException { 1035 BlockSizeOption blockSizeOption = 1036 Options.getOption(BlockSizeOption.class, opts); 1037 BufferSizeOption bufferSizeOption = 1038 Options.getOption(BufferSizeOption.class, opts); 1039 ReplicationOption replicationOption = 1040 Options.getOption(ReplicationOption.class, opts); 1041 ProgressableOption progressOption = 1042 Options.getOption(ProgressableOption.class, opts); 1043 FileOption fileOption = Options.getOption(FileOption.class, opts); 1044 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1045 AppendIfExistsOption.class, opts); 1046 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1047 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1048 KeyClassOption keyClassOption = 1049 Options.getOption(KeyClassOption.class, opts); 1050 ValueClassOption valueClassOption = 1051 Options.getOption(ValueClassOption.class, opts); 1052 MetadataOption metadataOption = 1053 Options.getOption(MetadataOption.class, opts); 1054 CompressionOption compressionTypeOption = 1055 Options.getOption(CompressionOption.class, opts); 1056 // check consistency of options 1057 if ((fileOption == null) == (streamOption == null)) { 1058 throw new IllegalArgumentException("file or stream must be specified"); 1059 } 1060 if (fileOption == null && (blockSizeOption != null || 1061 bufferSizeOption != null || 1062 replicationOption != null || 1063 progressOption != null)) { 1064 throw new IllegalArgumentException("file modifier options not " + 1065 "compatible with stream"); 1066 } 1067 1068 FSDataOutputStream out; 1069 boolean ownStream = fileOption != null; 1070 if (ownStream) { 1071 Path p = fileOption.getValue(); 1072 FileSystem fs; 1073 if (fsOption != null) { 1074 fs = fsOption.getValue(); 1075 } else { 1076 fs = p.getFileSystem(conf); 1077 } 1078 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1079 bufferSizeOption.getValue(); 1080 short replication = replicationOption == null ? 1081 fs.getDefaultReplication(p) : 1082 (short) replicationOption.getValue(); 1083 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1084 blockSizeOption.getValue(); 1085 Progressable progress = progressOption == null ? null : 1086 progressOption.getValue(); 1087 1088 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1089 && fs.exists(p)) { 1090 1091 // Read the file and verify header details 1092 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1093 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1094 try { 1095 1096 if (keyClassOption.getValue() != reader.getKeyClass() 1097 || valueClassOption.getValue() != reader.getValueClass()) { 1098 throw new IllegalArgumentException( 1099 "Key/value class provided does not match the file"); 1100 } 1101 1102 if (reader.getVersion() != VERSION[3]) { 1103 throw new VersionMismatchException(VERSION[3], 1104 reader.getVersion()); 1105 } 1106 1107 if (metadataOption != null) { 1108 LOG.info("MetaData Option is ignored during append"); 1109 } 1110 metadataOption = (MetadataOption) SequenceFile.Writer 1111 .metadata(reader.getMetadata()); 1112 1113 CompressionOption readerCompressionOption = new CompressionOption( 1114 reader.getCompressionType(), reader.getCompressionCodec()); 1115 1116 if (readerCompressionOption.value != compressionTypeOption.value 1117 || !readerCompressionOption.codec.getClass().getName() 1118 .equals(compressionTypeOption.codec.getClass().getName())) { 1119 throw new IllegalArgumentException( 1120 "Compression option provided does not match the file"); 1121 } 1122 1123 sync = reader.getSync(); 1124 1125 } finally { 1126 reader.close(); 1127 } 1128 1129 out = fs.append(p, bufferSize, progress); 1130 this.appendMode = true; 1131 } else { 1132 out = fs 1133 .create(p, true, bufferSize, replication, blockSize, progress); 1134 } 1135 } else { 1136 out = streamOption.getValue(); 1137 } 1138 Class<?> keyClass = keyClassOption == null ? 1139 Object.class : keyClassOption.getValue(); 1140 Class<?> valueClass = valueClassOption == null ? 1141 Object.class : valueClassOption.getValue(); 1142 Metadata metadata = metadataOption == null ? 1143 new Metadata() : metadataOption.getValue(); 1144 this.compress = compressionTypeOption.getValue(); 1145 final CompressionCodec codec = compressionTypeOption.getCodec(); 1146 if (codec != null && 1147 (codec instanceof GzipCodec) && 1148 !NativeCodeLoader.isNativeCodeLoaded() && 1149 !ZlibFactory.isNativeZlibLoaded(conf)) { 1150 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1151 "GzipCodec without native-hadoop " + 1152 "code!"); 1153 } 1154 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1155 } 1156 1157 /** Create the named file. 1158 * @deprecated Use 1159 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1160 * instead. 1161 */ 1162 @Deprecated 1163 public Writer(FileSystem fs, Configuration conf, Path name, 1164 Class keyClass, Class valClass) throws IOException { 1165 this.compress = CompressionType.NONE; 1166 init(conf, fs.create(name), true, keyClass, valClass, null, 1167 new Metadata()); 1168 } 1169 1170 /** Create the named file with write-progress reporter. 1171 * @deprecated Use 1172 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1173 * instead. 1174 */ 1175 @Deprecated 1176 public Writer(FileSystem fs, Configuration conf, Path name, 1177 Class keyClass, Class valClass, 1178 Progressable progress, Metadata metadata) throws IOException { 1179 this.compress = CompressionType.NONE; 1180 init(conf, fs.create(name, progress), true, keyClass, valClass, 1181 null, metadata); 1182 } 1183 1184 /** Create the named file with write-progress reporter. 1185 * @deprecated Use 1186 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1187 * instead. 1188 */ 1189 @Deprecated 1190 public Writer(FileSystem fs, Configuration conf, Path name, 1191 Class keyClass, Class valClass, 1192 int bufferSize, short replication, long blockSize, 1193 Progressable progress, Metadata metadata) throws IOException { 1194 this.compress = CompressionType.NONE; 1195 init(conf, 1196 fs.create(name, true, bufferSize, replication, blockSize, progress), 1197 true, keyClass, valClass, null, metadata); 1198 } 1199 1200 boolean isCompressed() { return compress != CompressionType.NONE; } 1201 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1202 1203 Writer ownStream() { this.ownOutputStream = true; return this; } 1204 1205 /** Write and flush the file header. */ 1206 private void writeFileHeader() 1207 throws IOException { 1208 out.write(VERSION); 1209 Text.writeString(out, keyClass.getName()); 1210 Text.writeString(out, valClass.getName()); 1211 1212 out.writeBoolean(this.isCompressed()); 1213 out.writeBoolean(this.isBlockCompressed()); 1214 1215 if (this.isCompressed()) { 1216 Text.writeString(out, (codec.getClass()).getName()); 1217 } 1218 this.metadata.write(out); 1219 out.write(sync); // write the sync bytes 1220 out.flush(); // flush header 1221 } 1222 1223 /** Initialize. */ 1224 @SuppressWarnings("unchecked") 1225 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1226 Class keyClass, Class valClass, 1227 CompressionCodec codec, Metadata metadata) 1228 throws IOException { 1229 this.conf = conf; 1230 this.out = out; 1231 this.ownOutputStream = ownStream; 1232 this.keyClass = keyClass; 1233 this.valClass = valClass; 1234 this.codec = codec; 1235 this.metadata = metadata; 1236 SerializationFactory serializationFactory = new SerializationFactory(conf); 1237 this.keySerializer = serializationFactory.getSerializer(keyClass); 1238 if (this.keySerializer == null) { 1239 throw new IOException( 1240 "Could not find a serializer for the Key class: '" 1241 + keyClass.getCanonicalName() + "'. " 1242 + "Please ensure that the configuration '" + 1243 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1244 + "properly configured, if you're using" 1245 + "custom serialization."); 1246 } 1247 this.keySerializer.open(buffer); 1248 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1249 if (this.uncompressedValSerializer == null) { 1250 throw new IOException( 1251 "Could not find a serializer for the Value class: '" 1252 + valClass.getCanonicalName() + "'. " 1253 + "Please ensure that the configuration '" + 1254 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1255 + "properly configured, if you're using" 1256 + "custom serialization."); 1257 } 1258 this.uncompressedValSerializer.open(buffer); 1259 if (this.codec != null) { 1260 ReflectionUtils.setConf(this.codec, this.conf); 1261 this.compressor = CodecPool.getCompressor(this.codec); 1262 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1263 this.deflateOut = 1264 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1265 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1266 if (this.compressedValSerializer == null) { 1267 throw new IOException( 1268 "Could not find a serializer for the Value class: '" 1269 + valClass.getCanonicalName() + "'. " 1270 + "Please ensure that the configuration '" + 1271 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1272 + "properly configured, if you're using" 1273 + "custom serialization."); 1274 } 1275 this.compressedValSerializer.open(deflateOut); 1276 } 1277 1278 if (appendMode) { 1279 sync(); 1280 } else { 1281 writeFileHeader(); 1282 } 1283 } 1284 1285 /** Returns the class of keys in this file. */ 1286 public Class getKeyClass() { return keyClass; } 1287 1288 /** Returns the class of values in this file. */ 1289 public Class getValueClass() { return valClass; } 1290 1291 /** Returns the compression codec of data in this file. */ 1292 public CompressionCodec getCompressionCodec() { return codec; } 1293 1294 /** create a sync point */ 1295 public void sync() throws IOException { 1296 if (sync != null && lastSyncPos != out.getPos()) { 1297 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1298 out.write(sync); // write sync 1299 lastSyncPos = out.getPos(); // update lastSyncPos 1300 } 1301 } 1302 1303 /** 1304 * flush all currently written data to the file system 1305 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1306 */ 1307 @Deprecated 1308 public void syncFs() throws IOException { 1309 if (out != null) { 1310 out.sync(); // flush contents to file system 1311 } 1312 } 1313 1314 @Override 1315 public void hsync() throws IOException { 1316 if (out != null) { 1317 out.hsync(); 1318 } 1319 } 1320 1321 @Override 1322 public void hflush() throws IOException { 1323 if (out != null) { 1324 out.hflush(); 1325 } 1326 } 1327 1328 /** Returns the configuration of this file. */ 1329 Configuration getConf() { return conf; } 1330 1331 /** Close the file. */ 1332 @Override 1333 public synchronized void close() throws IOException { 1334 keySerializer.close(); 1335 uncompressedValSerializer.close(); 1336 if (compressedValSerializer != null) { 1337 compressedValSerializer.close(); 1338 } 1339 1340 CodecPool.returnCompressor(compressor); 1341 compressor = null; 1342 1343 if (out != null) { 1344 1345 // Close the underlying stream iff we own it... 1346 if (ownOutputStream) { 1347 out.close(); 1348 } else { 1349 out.flush(); 1350 } 1351 out = null; 1352 } 1353 } 1354 1355 synchronized void checkAndWriteSync() throws IOException { 1356 if (sync != null && 1357 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1358 sync(); 1359 } 1360 } 1361 1362 /** Append a key/value pair. */ 1363 public void append(Writable key, Writable val) 1364 throws IOException { 1365 append((Object) key, (Object) val); 1366 } 1367 1368 /** Append a key/value pair. */ 1369 @SuppressWarnings("unchecked") 1370 public synchronized void append(Object key, Object val) 1371 throws IOException { 1372 if (key.getClass() != keyClass) 1373 throw new IOException("wrong key class: "+key.getClass().getName() 1374 +" is not "+keyClass); 1375 if (val.getClass() != valClass) 1376 throw new IOException("wrong value class: "+val.getClass().getName() 1377 +" is not "+valClass); 1378 1379 buffer.reset(); 1380 1381 // Append the 'key' 1382 keySerializer.serialize(key); 1383 int keyLength = buffer.getLength(); 1384 if (keyLength < 0) 1385 throw new IOException("negative length keys not allowed: " + key); 1386 1387 // Append the 'value' 1388 if (compress == CompressionType.RECORD) { 1389 deflateFilter.resetState(); 1390 compressedValSerializer.serialize(val); 1391 deflateOut.flush(); 1392 deflateFilter.finish(); 1393 } else { 1394 uncompressedValSerializer.serialize(val); 1395 } 1396 1397 // Write the record out 1398 checkAndWriteSync(); // sync 1399 out.writeInt(buffer.getLength()); // total record length 1400 out.writeInt(keyLength); // key portion length 1401 out.write(buffer.getData(), 0, buffer.getLength()); // data 1402 } 1403 1404 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1405 int keyLength, ValueBytes val) throws IOException { 1406 if (keyLength < 0) 1407 throw new IOException("negative length keys not allowed: " + keyLength); 1408 1409 int valLength = val.getSize(); 1410 1411 checkAndWriteSync(); 1412 1413 out.writeInt(keyLength+valLength); // total record length 1414 out.writeInt(keyLength); // key portion length 1415 out.write(keyData, keyOffset, keyLength); // key 1416 val.writeUncompressedBytes(out); // value 1417 } 1418 1419 /** Returns the current length of the output file. 1420 * 1421 * <p>This always returns a synchronized position. In other words, 1422 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1423 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1424 * the key may be earlier in the file than key last written when this 1425 * method was called (e.g., with block-compression, it may be the first key 1426 * in the block that was being written when this method was called). 1427 */ 1428 public synchronized long getLength() throws IOException { 1429 return out.getPos(); 1430 } 1431 1432 } // class Writer 1433 1434 /** Write key/compressed-value pairs to a sequence-format file. */ 1435 static class RecordCompressWriter extends Writer { 1436 1437 RecordCompressWriter(Configuration conf, 1438 Option... options) throws IOException { 1439 super(conf, options); 1440 } 1441 1442 /** Append a key/value pair. */ 1443 @Override 1444 @SuppressWarnings("unchecked") 1445 public synchronized void append(Object key, Object val) 1446 throws IOException { 1447 if (key.getClass() != keyClass) 1448 throw new IOException("wrong key class: "+key.getClass().getName() 1449 +" is not "+keyClass); 1450 if (val.getClass() != valClass) 1451 throw new IOException("wrong value class: "+val.getClass().getName() 1452 +" is not "+valClass); 1453 1454 buffer.reset(); 1455 1456 // Append the 'key' 1457 keySerializer.serialize(key); 1458 int keyLength = buffer.getLength(); 1459 if (keyLength < 0) 1460 throw new IOException("negative length keys not allowed: " + key); 1461 1462 // Compress 'value' and append it 1463 deflateFilter.resetState(); 1464 compressedValSerializer.serialize(val); 1465 deflateOut.flush(); 1466 deflateFilter.finish(); 1467 1468 // Write the record out 1469 checkAndWriteSync(); // sync 1470 out.writeInt(buffer.getLength()); // total record length 1471 out.writeInt(keyLength); // key portion length 1472 out.write(buffer.getData(), 0, buffer.getLength()); // data 1473 } 1474 1475 /** Append a key/value pair. */ 1476 @Override 1477 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1478 int keyLength, ValueBytes val) throws IOException { 1479 1480 if (keyLength < 0) 1481 throw new IOException("negative length keys not allowed: " + keyLength); 1482 1483 int valLength = val.getSize(); 1484 1485 checkAndWriteSync(); // sync 1486 out.writeInt(keyLength+valLength); // total record length 1487 out.writeInt(keyLength); // key portion length 1488 out.write(keyData, keyOffset, keyLength); // 'key' data 1489 val.writeCompressedBytes(out); // 'value' data 1490 } 1491 1492 } // RecordCompressionWriter 1493 1494 /** Write compressed key/value blocks to a sequence-format file. */ 1495 static class BlockCompressWriter extends Writer { 1496 1497 private int noBufferedRecords = 0; 1498 1499 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1500 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1501 1502 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1503 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1504 1505 private final int compressionBlockSize; 1506 1507 BlockCompressWriter(Configuration conf, 1508 Option... options) throws IOException { 1509 super(conf, options); 1510 compressionBlockSize = 1511 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1512 keySerializer.close(); 1513 keySerializer.open(keyBuffer); 1514 uncompressedValSerializer.close(); 1515 uncompressedValSerializer.open(valBuffer); 1516 } 1517 1518 /** Workhorse to check and write out compressed data/lengths */ 1519 private synchronized 1520 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1521 throws IOException { 1522 deflateFilter.resetState(); 1523 buffer.reset(); 1524 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1525 uncompressedDataBuffer.getLength()); 1526 deflateOut.flush(); 1527 deflateFilter.finish(); 1528 1529 WritableUtils.writeVInt(out, buffer.getLength()); 1530 out.write(buffer.getData(), 0, buffer.getLength()); 1531 } 1532 1533 /** Compress and flush contents to dfs */ 1534 @Override 1535 public synchronized void sync() throws IOException { 1536 if (noBufferedRecords > 0) { 1537 super.sync(); 1538 1539 // No. of records 1540 WritableUtils.writeVInt(out, noBufferedRecords); 1541 1542 // Write 'keys' and lengths 1543 writeBuffer(keyLenBuffer); 1544 writeBuffer(keyBuffer); 1545 1546 // Write 'values' and lengths 1547 writeBuffer(valLenBuffer); 1548 writeBuffer(valBuffer); 1549 1550 // Flush the file-stream 1551 out.flush(); 1552 1553 // Reset internal states 1554 keyLenBuffer.reset(); 1555 keyBuffer.reset(); 1556 valLenBuffer.reset(); 1557 valBuffer.reset(); 1558 noBufferedRecords = 0; 1559 } 1560 1561 } 1562 1563 /** Close the file. */ 1564 @Override 1565 public synchronized void close() throws IOException { 1566 if (out != null) { 1567 sync(); 1568 } 1569 super.close(); 1570 } 1571 1572 /** Append a key/value pair. */ 1573 @Override 1574 @SuppressWarnings("unchecked") 1575 public synchronized void append(Object key, Object val) 1576 throws IOException { 1577 if (key.getClass() != keyClass) 1578 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1579 if (val.getClass() != valClass) 1580 throw new IOException("wrong value class: "+val+" is not "+valClass); 1581 1582 // Save key/value into respective buffers 1583 int oldKeyLength = keyBuffer.getLength(); 1584 keySerializer.serialize(key); 1585 int keyLength = keyBuffer.getLength() - oldKeyLength; 1586 if (keyLength < 0) 1587 throw new IOException("negative length keys not allowed: " + key); 1588 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1589 1590 int oldValLength = valBuffer.getLength(); 1591 uncompressedValSerializer.serialize(val); 1592 int valLength = valBuffer.getLength() - oldValLength; 1593 WritableUtils.writeVInt(valLenBuffer, valLength); 1594 1595 // Added another key/value pair 1596 ++noBufferedRecords; 1597 1598 // Compress and flush? 1599 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1600 if (currentBlockSize >= compressionBlockSize) { 1601 sync(); 1602 } 1603 } 1604 1605 /** Append a key/value pair. */ 1606 @Override 1607 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1608 int keyLength, ValueBytes val) throws IOException { 1609 1610 if (keyLength < 0) 1611 throw new IOException("negative length keys not allowed"); 1612 1613 int valLength = val.getSize(); 1614 1615 // Save key/value data in relevant buffers 1616 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1617 keyBuffer.write(keyData, keyOffset, keyLength); 1618 WritableUtils.writeVInt(valLenBuffer, valLength); 1619 val.writeUncompressedBytes(valBuffer); 1620 1621 // Added another key/value pair 1622 ++noBufferedRecords; 1623 1624 // Compress and flush? 1625 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1626 if (currentBlockSize >= compressionBlockSize) { 1627 sync(); 1628 } 1629 } 1630 1631 } // BlockCompressionWriter 1632 1633 /** Get the configured buffer size */ 1634 private static int getBufferSize(Configuration conf) { 1635 return conf.getInt("io.file.buffer.size", 4096); 1636 } 1637 1638 /** Reads key/value pairs from a sequence-format file. */ 1639 public static class Reader implements java.io.Closeable { 1640 private String filename; 1641 private FSDataInputStream in; 1642 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1643 1644 private byte version; 1645 1646 private String keyClassName; 1647 private String valClassName; 1648 private Class keyClass; 1649 private Class valClass; 1650 1651 private CompressionCodec codec = null; 1652 private Metadata metadata = null; 1653 1654 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1655 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1656 private boolean syncSeen; 1657 1658 private long headerEnd; 1659 private long end; 1660 private int keyLength; 1661 private int recordLength; 1662 1663 private boolean decompress; 1664 private boolean blockCompressed; 1665 1666 private Configuration conf; 1667 1668 private int noBufferedRecords = 0; 1669 private boolean lazyDecompress = true; 1670 private boolean valuesDecompressed = true; 1671 1672 private int noBufferedKeys = 0; 1673 private int noBufferedValues = 0; 1674 1675 private DataInputBuffer keyLenBuffer = null; 1676 private CompressionInputStream keyLenInFilter = null; 1677 private DataInputStream keyLenIn = null; 1678 private Decompressor keyLenDecompressor = null; 1679 private DataInputBuffer keyBuffer = null; 1680 private CompressionInputStream keyInFilter = null; 1681 private DataInputStream keyIn = null; 1682 private Decompressor keyDecompressor = null; 1683 1684 private DataInputBuffer valLenBuffer = null; 1685 private CompressionInputStream valLenInFilter = null; 1686 private DataInputStream valLenIn = null; 1687 private Decompressor valLenDecompressor = null; 1688 private DataInputBuffer valBuffer = null; 1689 private CompressionInputStream valInFilter = null; 1690 private DataInputStream valIn = null; 1691 private Decompressor valDecompressor = null; 1692 1693 private Deserializer keyDeserializer; 1694 private Deserializer valDeserializer; 1695 1696 /** 1697 * A tag interface for all of the Reader options 1698 */ 1699 public static interface Option {} 1700 1701 /** 1702 * Create an option to specify the path name of the sequence file. 1703 * @param value the path to read 1704 * @return a new option 1705 */ 1706 public static Option file(Path value) { 1707 return new FileOption(value); 1708 } 1709 1710 /** 1711 * Create an option to specify the stream with the sequence file. 1712 * @param value the stream to read. 1713 * @return a new option 1714 */ 1715 public static Option stream(FSDataInputStream value) { 1716 return new InputStreamOption(value); 1717 } 1718 1719 /** 1720 * Create an option to specify the starting byte to read. 1721 * @param value the number of bytes to skip over 1722 * @return a new option 1723 */ 1724 public static Option start(long value) { 1725 return new StartOption(value); 1726 } 1727 1728 /** 1729 * Create an option to specify the number of bytes to read. 1730 * @param value the number of bytes to read 1731 * @return a new option 1732 */ 1733 public static Option length(long value) { 1734 return new LengthOption(value); 1735 } 1736 1737 /** 1738 * Create an option with the buffer size for reading the given pathname. 1739 * @param value the number of bytes to buffer 1740 * @return a new option 1741 */ 1742 public static Option bufferSize(int value) { 1743 return new BufferSizeOption(value); 1744 } 1745 1746 private static class FileOption extends Options.PathOption 1747 implements Option { 1748 private FileOption(Path value) { 1749 super(value); 1750 } 1751 } 1752 1753 private static class InputStreamOption 1754 extends Options.FSDataInputStreamOption 1755 implements Option { 1756 private InputStreamOption(FSDataInputStream value) { 1757 super(value); 1758 } 1759 } 1760 1761 private static class StartOption extends Options.LongOption 1762 implements Option { 1763 private StartOption(long value) { 1764 super(value); 1765 } 1766 } 1767 1768 private static class LengthOption extends Options.LongOption 1769 implements Option { 1770 private LengthOption(long value) { 1771 super(value); 1772 } 1773 } 1774 1775 private static class BufferSizeOption extends Options.IntegerOption 1776 implements Option { 1777 private BufferSizeOption(int value) { 1778 super(value); 1779 } 1780 } 1781 1782 // only used directly 1783 private static class OnlyHeaderOption extends Options.BooleanOption 1784 implements Option { 1785 private OnlyHeaderOption() { 1786 super(true); 1787 } 1788 } 1789 1790 public Reader(Configuration conf, Option... opts) throws IOException { 1791 // Look up the options, these are null if not set 1792 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1793 InputStreamOption streamOpt = 1794 Options.getOption(InputStreamOption.class, opts); 1795 StartOption startOpt = Options.getOption(StartOption.class, opts); 1796 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1797 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1798 OnlyHeaderOption headerOnly = 1799 Options.getOption(OnlyHeaderOption.class, opts); 1800 // check for consistency 1801 if ((fileOpt == null) == (streamOpt == null)) { 1802 throw new 1803 IllegalArgumentException("File or stream option must be specified"); 1804 } 1805 if (fileOpt == null && bufOpt != null) { 1806 throw new IllegalArgumentException("buffer size can only be set when" + 1807 " a file is specified."); 1808 } 1809 // figure out the real values 1810 Path filename = null; 1811 FSDataInputStream file; 1812 final long len; 1813 if (fileOpt != null) { 1814 filename = fileOpt.getValue(); 1815 FileSystem fs = filename.getFileSystem(conf); 1816 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1817 len = null == lenOpt 1818 ? fs.getFileStatus(filename).getLen() 1819 : lenOpt.getValue(); 1820 file = openFile(fs, filename, bufSize, len); 1821 } else { 1822 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1823 file = streamOpt.getValue(); 1824 } 1825 long start = startOpt == null ? 0 : startOpt.getValue(); 1826 // really set up 1827 initialize(filename, file, start, len, conf, headerOnly != null); 1828 } 1829 1830 /** 1831 * Construct a reader by opening a file from the given file system. 1832 * @param fs The file system used to open the file. 1833 * @param file The file being read. 1834 * @param conf Configuration 1835 * @throws IOException 1836 * @deprecated Use Reader(Configuration, Option...) instead. 1837 */ 1838 @Deprecated 1839 public Reader(FileSystem fs, Path file, 1840 Configuration conf) throws IOException { 1841 this(conf, file(file.makeQualified(fs))); 1842 } 1843 1844 /** 1845 * Construct a reader by the given input stream. 1846 * @param in An input stream. 1847 * @param buffersize unused 1848 * @param start The starting position. 1849 * @param length The length being read. 1850 * @param conf Configuration 1851 * @throws IOException 1852 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1853 */ 1854 @Deprecated 1855 public Reader(FSDataInputStream in, int buffersize, 1856 long start, long length, Configuration conf) throws IOException { 1857 this(conf, stream(in), start(start), length(length)); 1858 } 1859 1860 /** Common work of the constructors. */ 1861 private void initialize(Path filename, FSDataInputStream in, 1862 long start, long length, Configuration conf, 1863 boolean tempReader) throws IOException { 1864 if (in == null) { 1865 throw new IllegalArgumentException("in == null"); 1866 } 1867 this.filename = filename == null ? "<unknown>" : filename.toString(); 1868 this.in = in; 1869 this.conf = conf; 1870 boolean succeeded = false; 1871 try { 1872 seek(start); 1873 this.end = this.in.getPos() + length; 1874 // if it wrapped around, use the max 1875 if (end < length) { 1876 end = Long.MAX_VALUE; 1877 } 1878 init(tempReader); 1879 succeeded = true; 1880 } finally { 1881 if (!succeeded) { 1882 IOUtils.cleanup(LOG, this.in); 1883 } 1884 } 1885 } 1886 1887 /** 1888 * Override this method to specialize the type of 1889 * {@link FSDataInputStream} returned. 1890 * @param fs The file system used to open the file. 1891 * @param file The file being read. 1892 * @param bufferSize The buffer size used to read the file. 1893 * @param length The length being read if it is >= 0. Otherwise, 1894 * the length is not available. 1895 * @return The opened stream. 1896 * @throws IOException 1897 */ 1898 protected FSDataInputStream openFile(FileSystem fs, Path file, 1899 int bufferSize, long length) throws IOException { 1900 return fs.open(file, bufferSize); 1901 } 1902 1903 /** 1904 * Initialize the {@link Reader} 1905 * @param tmpReader <code>true</code> if we are constructing a temporary 1906 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1907 * and hence do not initialize every component; 1908 * <code>false</code> otherwise. 1909 * @throws IOException 1910 */ 1911 private void init(boolean tempReader) throws IOException { 1912 byte[] versionBlock = new byte[VERSION.length]; 1913 in.readFully(versionBlock); 1914 1915 if ((versionBlock[0] != VERSION[0]) || 1916 (versionBlock[1] != VERSION[1]) || 1917 (versionBlock[2] != VERSION[2])) 1918 throw new IOException(this + " not a SequenceFile"); 1919 1920 // Set 'version' 1921 version = versionBlock[3]; 1922 if (version > VERSION[3]) 1923 throw new VersionMismatchException(VERSION[3], version); 1924 1925 if (version < BLOCK_COMPRESS_VERSION) { 1926 UTF8 className = new UTF8(); 1927 1928 className.readFields(in); 1929 keyClassName = className.toStringChecked(); // key class name 1930 1931 className.readFields(in); 1932 valClassName = className.toStringChecked(); // val class name 1933 } else { 1934 keyClassName = Text.readString(in); 1935 valClassName = Text.readString(in); 1936 } 1937 1938 if (version > 2) { // if version > 2 1939 this.decompress = in.readBoolean(); // is compressed? 1940 } else { 1941 decompress = false; 1942 } 1943 1944 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1945 this.blockCompressed = in.readBoolean(); // is block-compressed? 1946 } else { 1947 blockCompressed = false; 1948 } 1949 1950 // if version >= 5 1951 // setup the compression codec 1952 if (decompress) { 1953 if (version >= CUSTOM_COMPRESS_VERSION) { 1954 String codecClassname = Text.readString(in); 1955 try { 1956 Class<? extends CompressionCodec> codecClass 1957 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1958 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1959 } catch (ClassNotFoundException cnfe) { 1960 throw new IllegalArgumentException("Unknown codec: " + 1961 codecClassname, cnfe); 1962 } 1963 } else { 1964 codec = new DefaultCodec(); 1965 ((Configurable)codec).setConf(conf); 1966 } 1967 } 1968 1969 this.metadata = new Metadata(); 1970 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1971 this.metadata.readFields(in); 1972 } 1973 1974 if (version > 1) { // if version > 1 1975 in.readFully(sync); // read sync bytes 1976 headerEnd = in.getPos(); // record end of header 1977 } 1978 1979 // Initialize... *not* if this we are constructing a temporary Reader 1980 if (!tempReader) { 1981 valBuffer = new DataInputBuffer(); 1982 if (decompress) { 1983 valDecompressor = CodecPool.getDecompressor(codec); 1984 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1985 valIn = new DataInputStream(valInFilter); 1986 } else { 1987 valIn = valBuffer; 1988 } 1989 1990 if (blockCompressed) { 1991 keyLenBuffer = new DataInputBuffer(); 1992 keyBuffer = new DataInputBuffer(); 1993 valLenBuffer = new DataInputBuffer(); 1994 1995 keyLenDecompressor = CodecPool.getDecompressor(codec); 1996 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1997 keyLenDecompressor); 1998 keyLenIn = new DataInputStream(keyLenInFilter); 1999 2000 keyDecompressor = CodecPool.getDecompressor(codec); 2001 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2002 keyIn = new DataInputStream(keyInFilter); 2003 2004 valLenDecompressor = CodecPool.getDecompressor(codec); 2005 valLenInFilter = codec.createInputStream(valLenBuffer, 2006 valLenDecompressor); 2007 valLenIn = new DataInputStream(valLenInFilter); 2008 } 2009 2010 SerializationFactory serializationFactory = 2011 new SerializationFactory(conf); 2012 this.keyDeserializer = 2013 getDeserializer(serializationFactory, getKeyClass()); 2014 if (this.keyDeserializer == null) { 2015 throw new IOException( 2016 "Could not find a deserializer for the Key class: '" 2017 + getKeyClass().getCanonicalName() + "'. " 2018 + "Please ensure that the configuration '" + 2019 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2020 + "properly configured, if you're using " 2021 + "custom serialization."); 2022 } 2023 if (!blockCompressed) { 2024 this.keyDeserializer.open(valBuffer); 2025 } else { 2026 this.keyDeserializer.open(keyIn); 2027 } 2028 this.valDeserializer = 2029 getDeserializer(serializationFactory, getValueClass()); 2030 if (this.valDeserializer == null) { 2031 throw new IOException( 2032 "Could not find a deserializer for the Value class: '" 2033 + getValueClass().getCanonicalName() + "'. " 2034 + "Please ensure that the configuration '" + 2035 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2036 + "properly configured, if you're using " 2037 + "custom serialization."); 2038 } 2039 this.valDeserializer.open(valIn); 2040 } 2041 } 2042 2043 @SuppressWarnings("unchecked") 2044 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2045 return sf.getDeserializer(c); 2046 } 2047 2048 /** Close the file. */ 2049 @Override 2050 public synchronized void close() throws IOException { 2051 // Return the decompressors to the pool 2052 CodecPool.returnDecompressor(keyLenDecompressor); 2053 CodecPool.returnDecompressor(keyDecompressor); 2054 CodecPool.returnDecompressor(valLenDecompressor); 2055 CodecPool.returnDecompressor(valDecompressor); 2056 keyLenDecompressor = keyDecompressor = null; 2057 valLenDecompressor = valDecompressor = null; 2058 2059 if (keyDeserializer != null) { 2060 keyDeserializer.close(); 2061 } 2062 if (valDeserializer != null) { 2063 valDeserializer.close(); 2064 } 2065 2066 // Close the input-stream 2067 in.close(); 2068 } 2069 2070 /** Returns the name of the key class. */ 2071 public String getKeyClassName() { 2072 return keyClassName; 2073 } 2074 2075 /** Returns the class of keys in this file. */ 2076 public synchronized Class<?> getKeyClass() { 2077 if (null == keyClass) { 2078 try { 2079 keyClass = WritableName.getClass(getKeyClassName(), conf); 2080 } catch (IOException e) { 2081 throw new RuntimeException(e); 2082 } 2083 } 2084 return keyClass; 2085 } 2086 2087 /** Returns the name of the value class. */ 2088 public String getValueClassName() { 2089 return valClassName; 2090 } 2091 2092 /** Returns the class of values in this file. */ 2093 public synchronized Class<?> getValueClass() { 2094 if (null == valClass) { 2095 try { 2096 valClass = WritableName.getClass(getValueClassName(), conf); 2097 } catch (IOException e) { 2098 throw new RuntimeException(e); 2099 } 2100 } 2101 return valClass; 2102 } 2103 2104 /** Returns true if values are compressed. */ 2105 public boolean isCompressed() { return decompress; } 2106 2107 /** Returns true if records are block-compressed. */ 2108 public boolean isBlockCompressed() { return blockCompressed; } 2109 2110 /** Returns the compression codec of data in this file. */ 2111 public CompressionCodec getCompressionCodec() { return codec; } 2112 2113 private byte[] getSync() { 2114 return sync; 2115 } 2116 2117 private byte getVersion() { 2118 return version; 2119 } 2120 2121 /** 2122 * Get the compression type for this file. 2123 * @return the compression type 2124 */ 2125 public CompressionType getCompressionType() { 2126 if (decompress) { 2127 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2128 } else { 2129 return CompressionType.NONE; 2130 } 2131 } 2132 2133 /** Returns the metadata object of the file */ 2134 public Metadata getMetadata() { 2135 return this.metadata; 2136 } 2137 2138 /** Returns the configuration used for this file. */ 2139 Configuration getConf() { return conf; } 2140 2141 /** Read a compressed buffer */ 2142 private synchronized void readBuffer(DataInputBuffer buffer, 2143 CompressionInputStream filter) throws IOException { 2144 // Read data into a temporary buffer 2145 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2146 2147 try { 2148 int dataBufferLength = WritableUtils.readVInt(in); 2149 dataBuffer.write(in, dataBufferLength); 2150 2151 // Set up 'buffer' connected to the input-stream 2152 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2153 } finally { 2154 dataBuffer.close(); 2155 } 2156 2157 // Reset the codec 2158 filter.resetState(); 2159 } 2160 2161 /** Read the next 'compressed' block */ 2162 private synchronized void readBlock() throws IOException { 2163 // Check if we need to throw away a whole block of 2164 // 'values' due to 'lazy decompression' 2165 if (lazyDecompress && !valuesDecompressed) { 2166 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2167 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2168 } 2169 2170 // Reset internal states 2171 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2172 valuesDecompressed = false; 2173 2174 //Process sync 2175 if (sync != null) { 2176 in.readInt(); 2177 in.readFully(syncCheck); // read syncCheck 2178 if (!Arrays.equals(sync, syncCheck)) // check it 2179 throw new IOException("File is corrupt!"); 2180 } 2181 syncSeen = true; 2182 2183 // Read number of records in this block 2184 noBufferedRecords = WritableUtils.readVInt(in); 2185 2186 // Read key lengths and keys 2187 readBuffer(keyLenBuffer, keyLenInFilter); 2188 readBuffer(keyBuffer, keyInFilter); 2189 noBufferedKeys = noBufferedRecords; 2190 2191 // Read value lengths and values 2192 if (!lazyDecompress) { 2193 readBuffer(valLenBuffer, valLenInFilter); 2194 readBuffer(valBuffer, valInFilter); 2195 noBufferedValues = noBufferedRecords; 2196 valuesDecompressed = true; 2197 } 2198 } 2199 2200 /** 2201 * Position valLenIn/valIn to the 'value' 2202 * corresponding to the 'current' key 2203 */ 2204 private synchronized void seekToCurrentValue() throws IOException { 2205 if (!blockCompressed) { 2206 if (decompress) { 2207 valInFilter.resetState(); 2208 } 2209 valBuffer.reset(); 2210 } else { 2211 // Check if this is the first value in the 'block' to be read 2212 if (lazyDecompress && !valuesDecompressed) { 2213 // Read the value lengths and values 2214 readBuffer(valLenBuffer, valLenInFilter); 2215 readBuffer(valBuffer, valInFilter); 2216 noBufferedValues = noBufferedRecords; 2217 valuesDecompressed = true; 2218 } 2219 2220 // Calculate the no. of bytes to skip 2221 // Note: 'current' key has already been read! 2222 int skipValBytes = 0; 2223 int currentKey = noBufferedKeys + 1; 2224 for (int i=noBufferedValues; i > currentKey; --i) { 2225 skipValBytes += WritableUtils.readVInt(valLenIn); 2226 --noBufferedValues; 2227 } 2228 2229 // Skip to the 'val' corresponding to 'current' key 2230 if (skipValBytes > 0) { 2231 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2232 throw new IOException("Failed to seek to " + currentKey + 2233 "(th) value!"); 2234 } 2235 } 2236 } 2237 } 2238 2239 /** 2240 * Get the 'value' corresponding to the last read 'key'. 2241 * @param val : The 'value' to be read. 2242 * @throws IOException 2243 */ 2244 public synchronized void getCurrentValue(Writable val) 2245 throws IOException { 2246 if (val instanceof Configurable) { 2247 ((Configurable) val).setConf(this.conf); 2248 } 2249 2250 // Position stream to 'current' value 2251 seekToCurrentValue(); 2252 2253 if (!blockCompressed) { 2254 val.readFields(valIn); 2255 2256 if (valIn.read() > 0) { 2257 LOG.info("available bytes: " + valIn.available()); 2258 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2259 + " bytes, should read " + 2260 (valBuffer.getLength()-keyLength)); 2261 } 2262 } else { 2263 // Get the value 2264 int valLength = WritableUtils.readVInt(valLenIn); 2265 val.readFields(valIn); 2266 2267 // Read another compressed 'value' 2268 --noBufferedValues; 2269 2270 // Sanity check 2271 if ((valLength < 0) && LOG.isDebugEnabled()) { 2272 LOG.debug(val + " is a zero-length value"); 2273 } 2274 } 2275 2276 } 2277 2278 /** 2279 * Get the 'value' corresponding to the last read 'key'. 2280 * @param val : The 'value' to be read. 2281 * @throws IOException 2282 */ 2283 public synchronized Object getCurrentValue(Object val) 2284 throws IOException { 2285 if (val instanceof Configurable) { 2286 ((Configurable) val).setConf(this.conf); 2287 } 2288 2289 // Position stream to 'current' value 2290 seekToCurrentValue(); 2291 2292 if (!blockCompressed) { 2293 val = deserializeValue(val); 2294 2295 if (valIn.read() > 0) { 2296 LOG.info("available bytes: " + valIn.available()); 2297 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2298 + " bytes, should read " + 2299 (valBuffer.getLength()-keyLength)); 2300 } 2301 } else { 2302 // Get the value 2303 int valLength = WritableUtils.readVInt(valLenIn); 2304 val = deserializeValue(val); 2305 2306 // Read another compressed 'value' 2307 --noBufferedValues; 2308 2309 // Sanity check 2310 if ((valLength < 0) && LOG.isDebugEnabled()) { 2311 LOG.debug(val + " is a zero-length value"); 2312 } 2313 } 2314 return val; 2315 2316 } 2317 2318 @SuppressWarnings("unchecked") 2319 private Object deserializeValue(Object val) throws IOException { 2320 return valDeserializer.deserialize(val); 2321 } 2322 2323 /** Read the next key in the file into <code>key</code>, skipping its 2324 * value. True if another entry exists, and false at end of file. */ 2325 public synchronized boolean next(Writable key) throws IOException { 2326 if (key.getClass() != getKeyClass()) 2327 throw new IOException("wrong key class: "+key.getClass().getName() 2328 +" is not "+keyClass); 2329 2330 if (!blockCompressed) { 2331 outBuf.reset(); 2332 2333 keyLength = next(outBuf); 2334 if (keyLength < 0) 2335 return false; 2336 2337 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2338 2339 key.readFields(valBuffer); 2340 valBuffer.mark(0); 2341 if (valBuffer.getPosition() != keyLength) 2342 throw new IOException(key + " read " + valBuffer.getPosition() 2343 + " bytes, should read " + keyLength); 2344 } else { 2345 //Reset syncSeen 2346 syncSeen = false; 2347 2348 if (noBufferedKeys == 0) { 2349 try { 2350 readBlock(); 2351 } catch (EOFException eof) { 2352 return false; 2353 } 2354 } 2355 2356 int keyLength = WritableUtils.readVInt(keyLenIn); 2357 2358 // Sanity check 2359 if (keyLength < 0) { 2360 return false; 2361 } 2362 2363 //Read another compressed 'key' 2364 key.readFields(keyIn); 2365 --noBufferedKeys; 2366 } 2367 2368 return true; 2369 } 2370 2371 /** Read the next key/value pair in the file into <code>key</code> and 2372 * <code>val</code>. Returns true if such a pair exists and false when at 2373 * end of file */ 2374 public synchronized boolean next(Writable key, Writable val) 2375 throws IOException { 2376 if (val.getClass() != getValueClass()) 2377 throw new IOException("wrong value class: "+val+" is not "+valClass); 2378 2379 boolean more = next(key); 2380 2381 if (more) { 2382 getCurrentValue(val); 2383 } 2384 2385 return more; 2386 } 2387 2388 /** 2389 * Read and return the next record length, potentially skipping over 2390 * a sync block. 2391 * @return the length of the next record or -1 if there is no next record 2392 * @throws IOException 2393 */ 2394 private synchronized int readRecordLength() throws IOException { 2395 if (in.getPos() >= end) { 2396 return -1; 2397 } 2398 int length = in.readInt(); 2399 if (version > 1 && sync != null && 2400 length == SYNC_ESCAPE) { // process a sync entry 2401 in.readFully(syncCheck); // read syncCheck 2402 if (!Arrays.equals(sync, syncCheck)) // check it 2403 throw new IOException("File is corrupt!"); 2404 syncSeen = true; 2405 if (in.getPos() >= end) { 2406 return -1; 2407 } 2408 length = in.readInt(); // re-read length 2409 } else { 2410 syncSeen = false; 2411 } 2412 2413 return length; 2414 } 2415 2416 /** Read the next key/value pair in the file into <code>buffer</code>. 2417 * Returns the length of the key read, or -1 if at end of file. The length 2418 * of the value may be computed by calling buffer.getLength() before and 2419 * after calls to this method. */ 2420 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2421 @Deprecated 2422 synchronized int next(DataOutputBuffer buffer) throws IOException { 2423 // Unsupported for block-compressed sequence files 2424 if (blockCompressed) { 2425 throw new IOException("Unsupported call for block-compressed" + 2426 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2427 } 2428 try { 2429 int length = readRecordLength(); 2430 if (length == -1) { 2431 return -1; 2432 } 2433 int keyLength = in.readInt(); 2434 buffer.write(in, length); 2435 return keyLength; 2436 } catch (ChecksumException e) { // checksum failure 2437 handleChecksumException(e); 2438 return next(buffer); 2439 } 2440 } 2441 2442 public ValueBytes createValueBytes() { 2443 ValueBytes val = null; 2444 if (!decompress || blockCompressed) { 2445 val = new UncompressedBytes(); 2446 } else { 2447 val = new CompressedBytes(codec); 2448 } 2449 return val; 2450 } 2451 2452 /** 2453 * Read 'raw' records. 2454 * @param key - The buffer into which the key is read 2455 * @param val - The 'raw' value 2456 * @return Returns the total record length or -1 for end of file 2457 * @throws IOException 2458 */ 2459 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2460 throws IOException { 2461 if (!blockCompressed) { 2462 int length = readRecordLength(); 2463 if (length == -1) { 2464 return -1; 2465 } 2466 int keyLength = in.readInt(); 2467 int valLength = length - keyLength; 2468 key.write(in, keyLength); 2469 if (decompress) { 2470 CompressedBytes value = (CompressedBytes)val; 2471 value.reset(in, valLength); 2472 } else { 2473 UncompressedBytes value = (UncompressedBytes)val; 2474 value.reset(in, valLength); 2475 } 2476 2477 return length; 2478 } else { 2479 //Reset syncSeen 2480 syncSeen = false; 2481 2482 // Read 'key' 2483 if (noBufferedKeys == 0) { 2484 if (in.getPos() >= end) 2485 return -1; 2486 2487 try { 2488 readBlock(); 2489 } catch (EOFException eof) { 2490 return -1; 2491 } 2492 } 2493 int keyLength = WritableUtils.readVInt(keyLenIn); 2494 if (keyLength < 0) { 2495 throw new IOException("zero length key found!"); 2496 } 2497 key.write(keyIn, keyLength); 2498 --noBufferedKeys; 2499 2500 // Read raw 'value' 2501 seekToCurrentValue(); 2502 int valLength = WritableUtils.readVInt(valLenIn); 2503 UncompressedBytes rawValue = (UncompressedBytes)val; 2504 rawValue.reset(valIn, valLength); 2505 --noBufferedValues; 2506 2507 return (keyLength+valLength); 2508 } 2509 2510 } 2511 2512 /** 2513 * Read 'raw' keys. 2514 * @param key - The buffer into which the key is read 2515 * @return Returns the key length or -1 for end of file 2516 * @throws IOException 2517 */ 2518 public synchronized int nextRawKey(DataOutputBuffer key) 2519 throws IOException { 2520 if (!blockCompressed) { 2521 recordLength = readRecordLength(); 2522 if (recordLength == -1) { 2523 return -1; 2524 } 2525 keyLength = in.readInt(); 2526 key.write(in, keyLength); 2527 return keyLength; 2528 } else { 2529 //Reset syncSeen 2530 syncSeen = false; 2531 2532 // Read 'key' 2533 if (noBufferedKeys == 0) { 2534 if (in.getPos() >= end) 2535 return -1; 2536 2537 try { 2538 readBlock(); 2539 } catch (EOFException eof) { 2540 return -1; 2541 } 2542 } 2543 int keyLength = WritableUtils.readVInt(keyLenIn); 2544 if (keyLength < 0) { 2545 throw new IOException("zero length key found!"); 2546 } 2547 key.write(keyIn, keyLength); 2548 --noBufferedKeys; 2549 2550 return keyLength; 2551 } 2552 2553 } 2554 2555 /** Read the next key in the file, skipping its 2556 * value. Return null at end of file. */ 2557 public synchronized Object next(Object key) throws IOException { 2558 if (key != null && key.getClass() != getKeyClass()) { 2559 throw new IOException("wrong key class: "+key.getClass().getName() 2560 +" is not "+keyClass); 2561 } 2562 2563 if (!blockCompressed) { 2564 outBuf.reset(); 2565 2566 keyLength = next(outBuf); 2567 if (keyLength < 0) 2568 return null; 2569 2570 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2571 2572 key = deserializeKey(key); 2573 valBuffer.mark(0); 2574 if (valBuffer.getPosition() != keyLength) 2575 throw new IOException(key + " read " + valBuffer.getPosition() 2576 + " bytes, should read " + keyLength); 2577 } else { 2578 //Reset syncSeen 2579 syncSeen = false; 2580 2581 if (noBufferedKeys == 0) { 2582 try { 2583 readBlock(); 2584 } catch (EOFException eof) { 2585 return null; 2586 } 2587 } 2588 2589 int keyLength = WritableUtils.readVInt(keyLenIn); 2590 2591 // Sanity check 2592 if (keyLength < 0) { 2593 return null; 2594 } 2595 2596 //Read another compressed 'key' 2597 key = deserializeKey(key); 2598 --noBufferedKeys; 2599 } 2600 2601 return key; 2602 } 2603 2604 @SuppressWarnings("unchecked") 2605 private Object deserializeKey(Object key) throws IOException { 2606 return keyDeserializer.deserialize(key); 2607 } 2608 2609 /** 2610 * Read 'raw' values. 2611 * @param val - The 'raw' value 2612 * @return Returns the value length 2613 * @throws IOException 2614 */ 2615 public synchronized int nextRawValue(ValueBytes val) 2616 throws IOException { 2617 2618 // Position stream to current value 2619 seekToCurrentValue(); 2620 2621 if (!blockCompressed) { 2622 int valLength = recordLength - keyLength; 2623 if (decompress) { 2624 CompressedBytes value = (CompressedBytes)val; 2625 value.reset(in, valLength); 2626 } else { 2627 UncompressedBytes value = (UncompressedBytes)val; 2628 value.reset(in, valLength); 2629 } 2630 2631 return valLength; 2632 } else { 2633 int valLength = WritableUtils.readVInt(valLenIn); 2634 UncompressedBytes rawValue = (UncompressedBytes)val; 2635 rawValue.reset(valIn, valLength); 2636 --noBufferedValues; 2637 return valLength; 2638 } 2639 2640 } 2641 2642 private void handleChecksumException(ChecksumException e) 2643 throws IOException { 2644 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2645 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2646 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2647 } else { 2648 throw e; 2649 } 2650 } 2651 2652 /** disables sync. often invoked for tmp files */ 2653 synchronized void ignoreSync() { 2654 sync = null; 2655 } 2656 2657 /** Set the current byte position in the input file. 2658 * 2659 * <p>The position passed must be a position returned by {@link 2660 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2661 * position, use {@link SequenceFile.Reader#sync(long)}. 2662 */ 2663 public synchronized void seek(long position) throws IOException { 2664 in.seek(position); 2665 if (blockCompressed) { // trigger block read 2666 noBufferedKeys = 0; 2667 valuesDecompressed = true; 2668 } 2669 } 2670 2671 /** Seek to the next sync mark past a given position.*/ 2672 public synchronized void sync(long position) throws IOException { 2673 if (position+SYNC_SIZE >= end) { 2674 seek(end); 2675 return; 2676 } 2677 2678 if (position < headerEnd) { 2679 // seek directly to first record 2680 in.seek(headerEnd); 2681 // note the sync marker "seen" in the header 2682 syncSeen = true; 2683 return; 2684 } 2685 2686 try { 2687 seek(position+4); // skip escape 2688 in.readFully(syncCheck); 2689 int syncLen = sync.length; 2690 for (int i = 0; in.getPos() < end; i++) { 2691 int j = 0; 2692 for (; j < syncLen; j++) { 2693 if (sync[j] != syncCheck[(i+j)%syncLen]) 2694 break; 2695 } 2696 if (j == syncLen) { 2697 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2698 return; 2699 } 2700 syncCheck[i%syncLen] = in.readByte(); 2701 } 2702 } catch (ChecksumException e) { // checksum failure 2703 handleChecksumException(e); 2704 } 2705 } 2706 2707 /** Returns true iff the previous call to next passed a sync mark.*/ 2708 public synchronized boolean syncSeen() { return syncSeen; } 2709 2710 /** Return the current byte position in the input file. */ 2711 public synchronized long getPosition() throws IOException { 2712 return in.getPos(); 2713 } 2714 2715 /** Returns the name of the file. */ 2716 @Override 2717 public String toString() { 2718 return filename; 2719 } 2720 2721 } 2722 2723 /** Sorts key/value pairs in a sequence-format file. 2724 * 2725 * <p>For best performance, applications should make sure that the {@link 2726 * Writable#readFields(DataInput)} implementation of their keys is 2727 * very efficient. In particular, it should avoid allocating memory. 2728 */ 2729 public static class Sorter { 2730 2731 private RawComparator comparator; 2732 2733 private MergeSort mergeSort; //the implementation of merge sort 2734 2735 private Path[] inFiles; // when merging or sorting 2736 2737 private Path outFile; 2738 2739 private int memory; // bytes 2740 private int factor; // merged per pass 2741 2742 private FileSystem fs = null; 2743 2744 private Class keyClass; 2745 private Class valClass; 2746 2747 private Configuration conf; 2748 private Metadata metadata; 2749 2750 private Progressable progressable = null; 2751 2752 /** Sort and merge files containing the named classes. */ 2753 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2754 Class valClass, Configuration conf) { 2755 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2756 } 2757 2758 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2759 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2760 Class valClass, Configuration conf) { 2761 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2762 } 2763 2764 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2765 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2766 Class valClass, Configuration conf, Metadata metadata) { 2767 this.fs = fs; 2768 this.comparator = comparator; 2769 this.keyClass = keyClass; 2770 this.valClass = valClass; 2771 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2772 this.factor = conf.getInt("io.sort.factor", 100); 2773 this.conf = conf; 2774 this.metadata = metadata; 2775 } 2776 2777 /** Set the number of streams to merge at once.*/ 2778 public void setFactor(int factor) { this.factor = factor; } 2779 2780 /** Get the number of streams to merge at once.*/ 2781 public int getFactor() { return factor; } 2782 2783 /** Set the total amount of buffer memory, in bytes.*/ 2784 public void setMemory(int memory) { this.memory = memory; } 2785 2786 /** Get the total amount of buffer memory, in bytes.*/ 2787 public int getMemory() { return memory; } 2788 2789 /** Set the progressable object in order to report progress. */ 2790 public void setProgressable(Progressable progressable) { 2791 this.progressable = progressable; 2792 } 2793 2794 /** 2795 * Perform a file sort from a set of input files into an output file. 2796 * @param inFiles the files to be sorted 2797 * @param outFile the sorted output file 2798 * @param deleteInput should the input files be deleted as they are read? 2799 */ 2800 public void sort(Path[] inFiles, Path outFile, 2801 boolean deleteInput) throws IOException { 2802 if (fs.exists(outFile)) { 2803 throw new IOException("already exists: " + outFile); 2804 } 2805 2806 this.inFiles = inFiles; 2807 this.outFile = outFile; 2808 2809 int segments = sortPass(deleteInput); 2810 if (segments > 1) { 2811 mergePass(outFile.getParent()); 2812 } 2813 } 2814 2815 /** 2816 * Perform a file sort from a set of input files and return an iterator. 2817 * @param inFiles the files to be sorted 2818 * @param tempDir the directory where temp files are created during sort 2819 * @param deleteInput should the input files be deleted as they are read? 2820 * @return iterator the RawKeyValueIterator 2821 */ 2822 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2823 boolean deleteInput) throws IOException { 2824 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2825 if (fs.exists(outFile)) { 2826 throw new IOException("already exists: " + outFile); 2827 } 2828 this.inFiles = inFiles; 2829 //outFile will basically be used as prefix for temp files in the cases 2830 //where sort outputs multiple sorted segments. For the single segment 2831 //case, the outputFile itself will contain the sorted data for that 2832 //segment 2833 this.outFile = outFile; 2834 2835 int segments = sortPass(deleteInput); 2836 if (segments > 1) 2837 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2838 tempDir); 2839 else if (segments == 1) 2840 return merge(new Path[]{outFile}, true, tempDir); 2841 else return null; 2842 } 2843 2844 /** 2845 * The backwards compatible interface to sort. 2846 * @param inFile the input file to sort 2847 * @param outFile the sorted output file 2848 */ 2849 public void sort(Path inFile, Path outFile) throws IOException { 2850 sort(new Path[]{inFile}, outFile, false); 2851 } 2852 2853 private int sortPass(boolean deleteInput) throws IOException { 2854 if(LOG.isDebugEnabled()) { 2855 LOG.debug("running sort pass"); 2856 } 2857 SortPass sortPass = new SortPass(); // make the SortPass 2858 sortPass.setProgressable(progressable); 2859 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2860 try { 2861 return sortPass.run(deleteInput); // run it 2862 } finally { 2863 sortPass.close(); // close it 2864 } 2865 } 2866 2867 private class SortPass { 2868 private int memoryLimit = memory/4; 2869 private int recordLimit = 1000000; 2870 2871 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2872 private byte[] rawBuffer; 2873 2874 private int[] keyOffsets = new int[1024]; 2875 private int[] pointers = new int[keyOffsets.length]; 2876 private int[] pointersCopy = new int[keyOffsets.length]; 2877 private int[] keyLengths = new int[keyOffsets.length]; 2878 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2879 2880 private ArrayList segmentLengths = new ArrayList(); 2881 2882 private Reader in = null; 2883 private FSDataOutputStream out = null; 2884 private FSDataOutputStream indexOut = null; 2885 private Path outName; 2886 2887 private Progressable progressable = null; 2888 2889 public int run(boolean deleteInput) throws IOException { 2890 int segments = 0; 2891 int currentFile = 0; 2892 boolean atEof = (currentFile >= inFiles.length); 2893 CompressionType compressionType; 2894 CompressionCodec codec = null; 2895 segmentLengths.clear(); 2896 if (atEof) { 2897 return 0; 2898 } 2899 2900 // Initialize 2901 in = new Reader(fs, inFiles[currentFile], conf); 2902 compressionType = in.getCompressionType(); 2903 codec = in.getCompressionCodec(); 2904 2905 for (int i=0; i < rawValues.length; ++i) { 2906 rawValues[i] = null; 2907 } 2908 2909 while (!atEof) { 2910 int count = 0; 2911 int bytesProcessed = 0; 2912 rawKeys.reset(); 2913 while (!atEof && 2914 bytesProcessed < memoryLimit && count < recordLimit) { 2915 2916 // Read a record into buffer 2917 // Note: Attempt to re-use 'rawValue' as far as possible 2918 int keyOffset = rawKeys.getLength(); 2919 ValueBytes rawValue = 2920 (count == keyOffsets.length || rawValues[count] == null) ? 2921 in.createValueBytes() : 2922 rawValues[count]; 2923 int recordLength = in.nextRaw(rawKeys, rawValue); 2924 if (recordLength == -1) { 2925 in.close(); 2926 if (deleteInput) { 2927 fs.delete(inFiles[currentFile], true); 2928 } 2929 currentFile += 1; 2930 atEof = currentFile >= inFiles.length; 2931 if (!atEof) { 2932 in = new Reader(fs, inFiles[currentFile], conf); 2933 } else { 2934 in = null; 2935 } 2936 continue; 2937 } 2938 2939 int keyLength = rawKeys.getLength() - keyOffset; 2940 2941 if (count == keyOffsets.length) 2942 grow(); 2943 2944 keyOffsets[count] = keyOffset; // update pointers 2945 pointers[count] = count; 2946 keyLengths[count] = keyLength; 2947 rawValues[count] = rawValue; 2948 2949 bytesProcessed += recordLength; 2950 count++; 2951 } 2952 2953 // buffer is full -- sort & flush it 2954 if(LOG.isDebugEnabled()) { 2955 LOG.debug("flushing segment " + segments); 2956 } 2957 rawBuffer = rawKeys.getData(); 2958 sort(count); 2959 // indicate we're making progress 2960 if (progressable != null) { 2961 progressable.progress(); 2962 } 2963 flush(count, bytesProcessed, compressionType, codec, 2964 segments==0 && atEof); 2965 segments++; 2966 } 2967 return segments; 2968 } 2969 2970 public void close() throws IOException { 2971 if (in != null) { 2972 in.close(); 2973 } 2974 if (out != null) { 2975 out.close(); 2976 } 2977 if (indexOut != null) { 2978 indexOut.close(); 2979 } 2980 } 2981 2982 private void grow() { 2983 int newLength = keyOffsets.length * 3 / 2; 2984 keyOffsets = grow(keyOffsets, newLength); 2985 pointers = grow(pointers, newLength); 2986 pointersCopy = new int[newLength]; 2987 keyLengths = grow(keyLengths, newLength); 2988 rawValues = grow(rawValues, newLength); 2989 } 2990 2991 private int[] grow(int[] old, int newLength) { 2992 int[] result = new int[newLength]; 2993 System.arraycopy(old, 0, result, 0, old.length); 2994 return result; 2995 } 2996 2997 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2998 ValueBytes[] result = new ValueBytes[newLength]; 2999 System.arraycopy(old, 0, result, 0, old.length); 3000 for (int i=old.length; i < newLength; ++i) { 3001 result[i] = null; 3002 } 3003 return result; 3004 } 3005 3006 private void flush(int count, int bytesProcessed, 3007 CompressionType compressionType, 3008 CompressionCodec codec, 3009 boolean done) throws IOException { 3010 if (out == null) { 3011 outName = done ? outFile : outFile.suffix(".0"); 3012 out = fs.create(outName); 3013 if (!done) { 3014 indexOut = fs.create(outName.suffix(".index")); 3015 } 3016 } 3017 3018 long segmentStart = out.getPos(); 3019 Writer writer = createWriter(conf, Writer.stream(out), 3020 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3021 Writer.compression(compressionType, codec), 3022 Writer.metadata(done ? metadata : new Metadata())); 3023 3024 if (!done) { 3025 writer.sync = null; // disable sync on temp files 3026 } 3027 3028 for (int i = 0; i < count; i++) { // write in sorted order 3029 int p = pointers[i]; 3030 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3031 } 3032 writer.close(); 3033 3034 if (!done) { 3035 // Save the segment length 3036 WritableUtils.writeVLong(indexOut, segmentStart); 3037 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3038 indexOut.flush(); 3039 } 3040 } 3041 3042 private void sort(int count) { 3043 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3044 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3045 } 3046 class SeqFileComparator implements Comparator<IntWritable> { 3047 @Override 3048 public int compare(IntWritable I, IntWritable J) { 3049 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3050 keyLengths[I.get()], rawBuffer, 3051 keyOffsets[J.get()], keyLengths[J.get()]); 3052 } 3053 } 3054 3055 /** set the progressable object in order to report progress */ 3056 public void setProgressable(Progressable progressable) 3057 { 3058 this.progressable = progressable; 3059 } 3060 3061 } // SequenceFile.Sorter.SortPass 3062 3063 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3064 public static interface RawKeyValueIterator { 3065 /** Gets the current raw key 3066 * @return DataOutputBuffer 3067 * @throws IOException 3068 */ 3069 DataOutputBuffer getKey() throws IOException; 3070 /** Gets the current raw value 3071 * @return ValueBytes 3072 * @throws IOException 3073 */ 3074 ValueBytes getValue() throws IOException; 3075 /** Sets up the current key and value (for getKey and getValue) 3076 * @return true if there exists a key/value, false otherwise 3077 * @throws IOException 3078 */ 3079 boolean next() throws IOException; 3080 /** closes the iterator so that the underlying streams can be closed 3081 * @throws IOException 3082 */ 3083 void close() throws IOException; 3084 /** Gets the Progress object; this has a float (0.0 - 1.0) 3085 * indicating the bytes processed by the iterator so far 3086 */ 3087 Progress getProgress(); 3088 } 3089 3090 /** 3091 * Merges the list of segments of type <code>SegmentDescriptor</code> 3092 * @param segments the list of SegmentDescriptors 3093 * @param tmpDir the directory to write temporary files into 3094 * @return RawKeyValueIterator 3095 * @throws IOException 3096 */ 3097 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3098 Path tmpDir) 3099 throws IOException { 3100 // pass in object to report progress, if present 3101 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3102 return mQueue.merge(); 3103 } 3104 3105 /** 3106 * Merges the contents of files passed in Path[] using a max factor value 3107 * that is already set 3108 * @param inNames the array of path names 3109 * @param deleteInputs true if the input files should be deleted when 3110 * unnecessary 3111 * @param tmpDir the directory to write temporary files into 3112 * @return RawKeyValueIteratorMergeQueue 3113 * @throws IOException 3114 */ 3115 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3116 Path tmpDir) 3117 throws IOException { 3118 return merge(inNames, deleteInputs, 3119 (inNames.length < factor) ? inNames.length : factor, 3120 tmpDir); 3121 } 3122 3123 /** 3124 * Merges the contents of files passed in Path[] 3125 * @param inNames the array of path names 3126 * @param deleteInputs true if the input files should be deleted when 3127 * unnecessary 3128 * @param factor the factor that will be used as the maximum merge fan-in 3129 * @param tmpDir the directory to write temporary files into 3130 * @return RawKeyValueIteratorMergeQueue 3131 * @throws IOException 3132 */ 3133 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3134 int factor, Path tmpDir) 3135 throws IOException { 3136 //get the segments from inNames 3137 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3138 for (int i = 0; i < inNames.length; i++) { 3139 SegmentDescriptor s = new SegmentDescriptor(0, 3140 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3141 s.preserveInput(!deleteInputs); 3142 s.doSync(); 3143 a.add(s); 3144 } 3145 this.factor = factor; 3146 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3147 return mQueue.merge(); 3148 } 3149 3150 /** 3151 * Merges the contents of files passed in Path[] 3152 * @param inNames the array of path names 3153 * @param tempDir the directory for creating temp files during merge 3154 * @param deleteInputs true if the input files should be deleted when 3155 * unnecessary 3156 * @return RawKeyValueIteratorMergeQueue 3157 * @throws IOException 3158 */ 3159 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3160 boolean deleteInputs) 3161 throws IOException { 3162 //outFile will basically be used as prefix for temp files for the 3163 //intermediate merge outputs 3164 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3165 //get the segments from inNames 3166 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3167 for (int i = 0; i < inNames.length; i++) { 3168 SegmentDescriptor s = new SegmentDescriptor(0, 3169 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3170 s.preserveInput(!deleteInputs); 3171 s.doSync(); 3172 a.add(s); 3173 } 3174 factor = (inNames.length < factor) ? inNames.length : factor; 3175 // pass in object to report progress, if present 3176 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3177 return mQueue.merge(); 3178 } 3179 3180 /** 3181 * Clones the attributes (like compression of the input file and creates a 3182 * corresponding Writer 3183 * @param inputFile the path of the input file whose attributes should be 3184 * cloned 3185 * @param outputFile the path of the output file 3186 * @param prog the Progressable to report status during the file write 3187 * @return Writer 3188 * @throws IOException 3189 */ 3190 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3191 Progressable prog) throws IOException { 3192 Reader reader = new Reader(conf, 3193 Reader.file(inputFile), 3194 new Reader.OnlyHeaderOption()); 3195 CompressionType compress = reader.getCompressionType(); 3196 CompressionCodec codec = reader.getCompressionCodec(); 3197 reader.close(); 3198 3199 Writer writer = createWriter(conf, 3200 Writer.file(outputFile), 3201 Writer.keyClass(keyClass), 3202 Writer.valueClass(valClass), 3203 Writer.compression(compress, codec), 3204 Writer.progressable(prog)); 3205 return writer; 3206 } 3207 3208 /** 3209 * Writes records from RawKeyValueIterator into a file represented by the 3210 * passed writer 3211 * @param records the RawKeyValueIterator 3212 * @param writer the Writer created earlier 3213 * @throws IOException 3214 */ 3215 public void writeFile(RawKeyValueIterator records, Writer writer) 3216 throws IOException { 3217 while(records.next()) { 3218 writer.appendRaw(records.getKey().getData(), 0, 3219 records.getKey().getLength(), records.getValue()); 3220 } 3221 writer.sync(); 3222 } 3223 3224 /** Merge the provided files. 3225 * @param inFiles the array of input path names 3226 * @param outFile the final output file 3227 * @throws IOException 3228 */ 3229 public void merge(Path[] inFiles, Path outFile) throws IOException { 3230 if (fs.exists(outFile)) { 3231 throw new IOException("already exists: " + outFile); 3232 } 3233 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3234 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3235 3236 writeFile(r, writer); 3237 3238 writer.close(); 3239 } 3240 3241 /** sort calls this to generate the final merged output */ 3242 private int mergePass(Path tmpDir) throws IOException { 3243 if(LOG.isDebugEnabled()) { 3244 LOG.debug("running merge pass"); 3245 } 3246 Writer writer = cloneFileAttributes( 3247 outFile.suffix(".0"), outFile, null); 3248 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3249 outFile.suffix(".0.index"), tmpDir); 3250 writeFile(r, writer); 3251 3252 writer.close(); 3253 return 0; 3254 } 3255 3256 /** Used by mergePass to merge the output of the sort 3257 * @param inName the name of the input file containing sorted segments 3258 * @param indexIn the offsets of the sorted segments 3259 * @param tmpDir the relative directory to store intermediate results in 3260 * @return RawKeyValueIterator 3261 * @throws IOException 3262 */ 3263 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3264 throws IOException { 3265 //get the segments from indexIn 3266 //we create a SegmentContainer so that we can track segments belonging to 3267 //inName and delete inName as soon as we see that we have looked at all 3268 //the contained segments during the merge process & hence don't need 3269 //them anymore 3270 SegmentContainer container = new SegmentContainer(inName, indexIn); 3271 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3272 return mQueue.merge(); 3273 } 3274 3275 /** This class implements the core of the merge logic */ 3276 private class MergeQueue extends PriorityQueue 3277 implements RawKeyValueIterator { 3278 private boolean compress; 3279 private boolean blockCompress; 3280 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3281 private ValueBytes rawValue; 3282 private long totalBytesProcessed; 3283 private float progPerByte; 3284 private Progress mergeProgress = new Progress(); 3285 private Path tmpDir; 3286 private Progressable progress = null; //handle to the progress reporting object 3287 private SegmentDescriptor minSegment; 3288 3289 //a TreeMap used to store the segments sorted by size (segment offset and 3290 //segment path name is used to break ties between segments of same sizes) 3291 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3292 new TreeMap<SegmentDescriptor, Void>(); 3293 3294 @SuppressWarnings("unchecked") 3295 public void put(SegmentDescriptor stream) throws IOException { 3296 if (size() == 0) { 3297 compress = stream.in.isCompressed(); 3298 blockCompress = stream.in.isBlockCompressed(); 3299 } else if (compress != stream.in.isCompressed() || 3300 blockCompress != stream.in.isBlockCompressed()) { 3301 throw new IOException("All merged files must be compressed or not."); 3302 } 3303 super.put(stream); 3304 } 3305 3306 /** 3307 * A queue of file segments to merge 3308 * @param segments the file segments to merge 3309 * @param tmpDir a relative local directory to save intermediate files in 3310 * @param progress the reference to the Progressable object 3311 */ 3312 public MergeQueue(List <SegmentDescriptor> segments, 3313 Path tmpDir, Progressable progress) { 3314 int size = segments.size(); 3315 for (int i = 0; i < size; i++) { 3316 sortedSegmentSizes.put(segments.get(i), null); 3317 } 3318 this.tmpDir = tmpDir; 3319 this.progress = progress; 3320 } 3321 @Override 3322 protected boolean lessThan(Object a, Object b) { 3323 // indicate we're making progress 3324 if (progress != null) { 3325 progress.progress(); 3326 } 3327 SegmentDescriptor msa = (SegmentDescriptor)a; 3328 SegmentDescriptor msb = (SegmentDescriptor)b; 3329 return comparator.compare(msa.getKey().getData(), 0, 3330 msa.getKey().getLength(), msb.getKey().getData(), 0, 3331 msb.getKey().getLength()) < 0; 3332 } 3333 @Override 3334 public void close() throws IOException { 3335 SegmentDescriptor ms; // close inputs 3336 while ((ms = (SegmentDescriptor)pop()) != null) { 3337 ms.cleanup(); 3338 } 3339 minSegment = null; 3340 } 3341 @Override 3342 public DataOutputBuffer getKey() throws IOException { 3343 return rawKey; 3344 } 3345 @Override 3346 public ValueBytes getValue() throws IOException { 3347 return rawValue; 3348 } 3349 @Override 3350 public boolean next() throws IOException { 3351 if (size() == 0) 3352 return false; 3353 if (minSegment != null) { 3354 //minSegment is non-null for all invocations of next except the first 3355 //one. For the first invocation, the priority queue is ready for use 3356 //but for the subsequent invocations, first adjust the queue 3357 adjustPriorityQueue(minSegment); 3358 if (size() == 0) { 3359 minSegment = null; 3360 return false; 3361 } 3362 } 3363 minSegment = (SegmentDescriptor)top(); 3364 long startPos = minSegment.in.getPosition(); // Current position in stream 3365 //save the raw key reference 3366 rawKey = minSegment.getKey(); 3367 //load the raw value. Re-use the existing rawValue buffer 3368 if (rawValue == null) { 3369 rawValue = minSegment.in.createValueBytes(); 3370 } 3371 minSegment.nextRawValue(rawValue); 3372 long endPos = minSegment.in.getPosition(); // End position after reading value 3373 updateProgress(endPos - startPos); 3374 return true; 3375 } 3376 3377 @Override 3378 public Progress getProgress() { 3379 return mergeProgress; 3380 } 3381 3382 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3383 long startPos = ms.in.getPosition(); // Current position in stream 3384 boolean hasNext = ms.nextRawKey(); 3385 long endPos = ms.in.getPosition(); // End position after reading key 3386 updateProgress(endPos - startPos); 3387 if (hasNext) { 3388 adjustTop(); 3389 } else { 3390 pop(); 3391 ms.cleanup(); 3392 } 3393 } 3394 3395 private void updateProgress(long bytesProcessed) { 3396 totalBytesProcessed += bytesProcessed; 3397 if (progPerByte > 0) { 3398 mergeProgress.set(totalBytesProcessed * progPerByte); 3399 } 3400 } 3401 3402 /** This is the single level merge that is called multiple times 3403 * depending on the factor size and the number of segments 3404 * @return RawKeyValueIterator 3405 * @throws IOException 3406 */ 3407 public RawKeyValueIterator merge() throws IOException { 3408 //create the MergeStreams from the sorted map created in the constructor 3409 //and dump the final output to a file 3410 int numSegments = sortedSegmentSizes.size(); 3411 int origFactor = factor; 3412 int passNo = 1; 3413 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3414 do { 3415 //get the factor for this pass of merge 3416 factor = getPassFactor(passNo, numSegments); 3417 List<SegmentDescriptor> segmentsToMerge = 3418 new ArrayList<SegmentDescriptor>(); 3419 int segmentsConsidered = 0; 3420 int numSegmentsToConsider = factor; 3421 while (true) { 3422 //extract the smallest 'factor' number of segment pointers from the 3423 //TreeMap. Call cleanup on the empty segments (no key/value data) 3424 SegmentDescriptor[] mStream = 3425 getSegmentDescriptors(numSegmentsToConsider); 3426 for (int i = 0; i < mStream.length; i++) { 3427 if (mStream[i].nextRawKey()) { 3428 segmentsToMerge.add(mStream[i]); 3429 segmentsConsidered++; 3430 // Count the fact that we read some bytes in calling nextRawKey() 3431 updateProgress(mStream[i].in.getPosition()); 3432 } 3433 else { 3434 mStream[i].cleanup(); 3435 numSegments--; //we ignore this segment for the merge 3436 } 3437 } 3438 //if we have the desired number of segments 3439 //or looked at all available segments, we break 3440 if (segmentsConsidered == factor || 3441 sortedSegmentSizes.size() == 0) { 3442 break; 3443 } 3444 3445 numSegmentsToConsider = factor - segmentsConsidered; 3446 } 3447 //feed the streams to the priority queue 3448 initialize(segmentsToMerge.size()); clear(); 3449 for (int i = 0; i < segmentsToMerge.size(); i++) { 3450 put(segmentsToMerge.get(i)); 3451 } 3452 //if we have lesser number of segments remaining, then just return the 3453 //iterator, else do another single level merge 3454 if (numSegments <= factor) { 3455 //calculate the length of the remaining segments. Required for 3456 //calculating the merge progress 3457 long totalBytes = 0; 3458 for (int i = 0; i < segmentsToMerge.size(); i++) { 3459 totalBytes += segmentsToMerge.get(i).segmentLength; 3460 } 3461 if (totalBytes != 0) //being paranoid 3462 progPerByte = 1.0f / (float)totalBytes; 3463 //reset factor to what it originally was 3464 factor = origFactor; 3465 return this; 3466 } else { 3467 //we want to spread the creation of temp files on multiple disks if 3468 //available under the space constraints 3469 long approxOutputSize = 0; 3470 for (SegmentDescriptor s : segmentsToMerge) { 3471 approxOutputSize += s.segmentLength + 3472 ChecksumFileSystem.getApproxChkSumLength( 3473 s.segmentLength); 3474 } 3475 Path tmpFilename = 3476 new Path(tmpDir, "intermediate").suffix("." + passNo); 3477 3478 Path outputFile = lDirAlloc.getLocalPathForWrite( 3479 tmpFilename.toString(), 3480 approxOutputSize, conf); 3481 if(LOG.isDebugEnabled()) { 3482 LOG.debug("writing intermediate results to " + outputFile); 3483 } 3484 Writer writer = cloneFileAttributes( 3485 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3486 fs.makeQualified(outputFile), null); 3487 writer.sync = null; //disable sync for temp files 3488 writeFile(this, writer); 3489 writer.close(); 3490 3491 //we finished one single level merge; now clean up the priority 3492 //queue 3493 this.close(); 3494 3495 SegmentDescriptor tempSegment = 3496 new SegmentDescriptor(0, 3497 fs.getFileStatus(outputFile).getLen(), outputFile); 3498 //put the segment back in the TreeMap 3499 sortedSegmentSizes.put(tempSegment, null); 3500 numSegments = sortedSegmentSizes.size(); 3501 passNo++; 3502 } 3503 //we are worried about only the first pass merge factor. So reset the 3504 //factor to what it originally was 3505 factor = origFactor; 3506 } while(true); 3507 } 3508 3509 //Hadoop-591 3510 public int getPassFactor(int passNo, int numSegments) { 3511 if (passNo > 1 || numSegments <= factor || factor == 1) 3512 return factor; 3513 int mod = (numSegments - 1) % (factor - 1); 3514 if (mod == 0) 3515 return factor; 3516 return mod + 1; 3517 } 3518 3519 /** Return (& remove) the requested number of segment descriptors from the 3520 * sorted map. 3521 */ 3522 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3523 if (numDescriptors > sortedSegmentSizes.size()) 3524 numDescriptors = sortedSegmentSizes.size(); 3525 SegmentDescriptor[] SegmentDescriptors = 3526 new SegmentDescriptor[numDescriptors]; 3527 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3528 int i = 0; 3529 while (i < numDescriptors) { 3530 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3531 iter.remove(); 3532 } 3533 return SegmentDescriptors; 3534 } 3535 } // SequenceFile.Sorter.MergeQueue 3536 3537 /** This class defines a merge segment. This class can be subclassed to 3538 * provide a customized cleanup method implementation. In this 3539 * implementation, cleanup closes the file handle and deletes the file 3540 */ 3541 public class SegmentDescriptor implements Comparable { 3542 3543 long segmentOffset; //the start of the segment in the file 3544 long segmentLength; //the length of the segment 3545 Path segmentPathName; //the path name of the file containing the segment 3546 boolean ignoreSync = true; //set to true for temp files 3547 private Reader in = null; 3548 private DataOutputBuffer rawKey = null; //this will hold the current key 3549 private boolean preserveInput = false; //delete input segment files? 3550 3551 /** Constructs a segment 3552 * @param segmentOffset the offset of the segment in the file 3553 * @param segmentLength the length of the segment 3554 * @param segmentPathName the path name of the file containing the segment 3555 */ 3556 public SegmentDescriptor (long segmentOffset, long segmentLength, 3557 Path segmentPathName) { 3558 this.segmentOffset = segmentOffset; 3559 this.segmentLength = segmentLength; 3560 this.segmentPathName = segmentPathName; 3561 } 3562 3563 /** Do the sync checks */ 3564 public void doSync() {ignoreSync = false;} 3565 3566 /** Whether to delete the files when no longer needed */ 3567 public void preserveInput(boolean preserve) { 3568 preserveInput = preserve; 3569 } 3570 3571 public boolean shouldPreserveInput() { 3572 return preserveInput; 3573 } 3574 3575 @Override 3576 public int compareTo(Object o) { 3577 SegmentDescriptor that = (SegmentDescriptor)o; 3578 if (this.segmentLength != that.segmentLength) { 3579 return (this.segmentLength < that.segmentLength ? -1 : 1); 3580 } 3581 if (this.segmentOffset != that.segmentOffset) { 3582 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3583 } 3584 return (this.segmentPathName.toString()). 3585 compareTo(that.segmentPathName.toString()); 3586 } 3587 3588 @Override 3589 public boolean equals(Object o) { 3590 if (!(o instanceof SegmentDescriptor)) { 3591 return false; 3592 } 3593 SegmentDescriptor that = (SegmentDescriptor)o; 3594 if (this.segmentLength == that.segmentLength && 3595 this.segmentOffset == that.segmentOffset && 3596 this.segmentPathName.toString().equals( 3597 that.segmentPathName.toString())) { 3598 return true; 3599 } 3600 return false; 3601 } 3602 3603 @Override 3604 public int hashCode() { 3605 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3606 } 3607 3608 /** Fills up the rawKey object with the key returned by the Reader 3609 * @return true if there is a key returned; false, otherwise 3610 * @throws IOException 3611 */ 3612 public boolean nextRawKey() throws IOException { 3613 if (in == null) { 3614 int bufferSize = getBufferSize(conf); 3615 Reader reader = new Reader(conf, 3616 Reader.file(segmentPathName), 3617 Reader.bufferSize(bufferSize), 3618 Reader.start(segmentOffset), 3619 Reader.length(segmentLength)); 3620 3621 //sometimes we ignore syncs especially for temp merge files 3622 if (ignoreSync) reader.ignoreSync(); 3623 3624 if (reader.getKeyClass() != keyClass) 3625 throw new IOException("wrong key class: " + reader.getKeyClass() + 3626 " is not " + keyClass); 3627 if (reader.getValueClass() != valClass) 3628 throw new IOException("wrong value class: "+reader.getValueClass()+ 3629 " is not " + valClass); 3630 this.in = reader; 3631 rawKey = new DataOutputBuffer(); 3632 } 3633 rawKey.reset(); 3634 int keyLength = 3635 in.nextRawKey(rawKey); 3636 return (keyLength >= 0); 3637 } 3638 3639 /** Fills up the passed rawValue with the value corresponding to the key 3640 * read earlier 3641 * @param rawValue 3642 * @return the length of the value 3643 * @throws IOException 3644 */ 3645 public int nextRawValue(ValueBytes rawValue) throws IOException { 3646 int valLength = in.nextRawValue(rawValue); 3647 return valLength; 3648 } 3649 3650 /** Returns the stored rawKey */ 3651 public DataOutputBuffer getKey() { 3652 return rawKey; 3653 } 3654 3655 /** closes the underlying reader */ 3656 private void close() throws IOException { 3657 this.in.close(); 3658 this.in = null; 3659 } 3660 3661 /** The default cleanup. Subclasses can override this with a custom 3662 * cleanup 3663 */ 3664 public void cleanup() throws IOException { 3665 close(); 3666 if (!preserveInput) { 3667 fs.delete(segmentPathName, true); 3668 } 3669 } 3670 } // SequenceFile.Sorter.SegmentDescriptor 3671 3672 /** This class provisions multiple segments contained within a single 3673 * file 3674 */ 3675 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3676 3677 SegmentContainer parentContainer = null; 3678 3679 /** Constructs a segment 3680 * @param segmentOffset the offset of the segment in the file 3681 * @param segmentLength the length of the segment 3682 * @param segmentPathName the path name of the file containing the segment 3683 * @param parent the parent SegmentContainer that holds the segment 3684 */ 3685 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3686 Path segmentPathName, SegmentContainer parent) { 3687 super(segmentOffset, segmentLength, segmentPathName); 3688 this.parentContainer = parent; 3689 } 3690 /** The default cleanup. Subclasses can override this with a custom 3691 * cleanup 3692 */ 3693 @Override 3694 public void cleanup() throws IOException { 3695 super.close(); 3696 if (super.shouldPreserveInput()) return; 3697 parentContainer.cleanup(); 3698 } 3699 3700 @Override 3701 public boolean equals(Object o) { 3702 if (!(o instanceof LinkedSegmentsDescriptor)) { 3703 return false; 3704 } 3705 return super.equals(o); 3706 } 3707 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3708 3709 /** The class that defines a container for segments to be merged. Primarily 3710 * required to delete temp files as soon as all the contained segments 3711 * have been looked at */ 3712 private class SegmentContainer { 3713 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3714 private int numSegmentsContained; //# of segments contained 3715 private Path inName; //input file from where segments are created 3716 3717 //the list of segments read from the file 3718 private ArrayList <SegmentDescriptor> segments = 3719 new ArrayList <SegmentDescriptor>(); 3720 /** This constructor is there primarily to serve the sort routine that 3721 * generates a single output file with an associated index file */ 3722 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3723 //get the segments from indexIn 3724 FSDataInputStream fsIndexIn = fs.open(indexIn); 3725 long end = fs.getFileStatus(indexIn).getLen(); 3726 while (fsIndexIn.getPos() < end) { 3727 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3728 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3729 Path segmentName = inName; 3730 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3731 segmentLength, segmentName, this)); 3732 } 3733 fsIndexIn.close(); 3734 fs.delete(indexIn, true); 3735 numSegmentsContained = segments.size(); 3736 this.inName = inName; 3737 } 3738 3739 public List <SegmentDescriptor> getSegmentList() { 3740 return segments; 3741 } 3742 public void cleanup() throws IOException { 3743 numSegmentsCleanedUp++; 3744 if (numSegmentsCleanedUp == numSegmentsContained) { 3745 fs.delete(inName, true); 3746 } 3747 } 3748 } //SequenceFile.Sorter.SegmentContainer 3749 3750 } // SequenceFile.Sorter 3751 3752} // SequenceFile