001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.nio.charset.StandardCharsets; 023import java.util.*; 024import java.rmi.server.UID; 025import java.security.MessageDigest; 026 027import org.apache.commons.logging.*; 028import org.apache.hadoop.util.Options; 029import org.apache.hadoop.fs.*; 030import org.apache.hadoop.fs.Options.CreateOpts; 031import org.apache.hadoop.io.compress.CodecPool; 032import org.apache.hadoop.io.compress.CompressionCodec; 033import org.apache.hadoop.io.compress.CompressionInputStream; 034import org.apache.hadoop.io.compress.CompressionOutputStream; 035import org.apache.hadoop.io.compress.Compressor; 036import org.apache.hadoop.io.compress.Decompressor; 037import org.apache.hadoop.io.compress.DefaultCodec; 038import org.apache.hadoop.io.compress.GzipCodec; 039import org.apache.hadoop.io.compress.zlib.ZlibFactory; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.Serializer; 042import org.apache.hadoop.io.serializer.SerializationFactory; 043import org.apache.hadoop.classification.InterfaceAudience; 044import org.apache.hadoop.classification.InterfaceStability; 045import org.apache.hadoop.conf.*; 046import org.apache.hadoop.util.Progressable; 047import org.apache.hadoop.util.Progress; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.NativeCodeLoader; 050import org.apache.hadoop.util.MergeSort; 051import org.apache.hadoop.util.PriorityQueue; 052import org.apache.hadoop.util.Time; 053 054import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; 055import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; 056import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT; 057import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY; 058import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT; 059import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY; 060 061/** 062 * <code>SequenceFile</code>s are flat files consisting of binary key/value 063 * pairs. 064 * 065 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer}, 066 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing, 067 * reading and sorting respectively.</p> 068 * 069 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 070 * {@link CompressionType} used to compress key/value pairs: 071 * <ol> 072 * <li> 073 * <code>Writer</code> : Uncompressed records. 074 * </li> 075 * <li> 076 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 077 * values. 078 * </li> 079 * <li> 080 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 081 * values are collected in 'blocks' 082 * separately and compressed. The size of 083 * the 'block' is configurable. 084 * </ol> 085 * 086 * <p>The actual compression algorithm used to compress key and/or values can be 087 * specified by using the appropriate {@link CompressionCodec}.</p> 088 * 089 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 090 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 091 * 092 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the 093 * above <code>SequenceFile</code> formats.</p> 094 * 095 * <h4 id="Formats">SequenceFile Formats</h4> 096 * 097 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 098 * depending on the <code>CompressionType</code> specified. All of them share a 099 * <a href="#Header">common header</a> described below. 100 * 101 * <h5 id="Header">SequenceFile Header</h5> 102 * <ul> 103 * <li> 104 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 105 * version number (e.g. SEQ4 or SEQ6) 106 * </li> 107 * <li> 108 * keyClassName -key class 109 * </li> 110 * <li> 111 * valueClassName - value class 112 * </li> 113 * <li> 114 * compression - A boolean which specifies if compression is turned on for 115 * keys/values in this file. 116 * </li> 117 * <li> 118 * blockCompression - A boolean which specifies if block-compression is 119 * turned on for keys/values in this file. 120 * </li> 121 * <li> 122 * compression codec - <code>CompressionCodec</code> class which is used for 123 * compression of keys and/or values (if compression is 124 * enabled). 125 * </li> 126 * <li> 127 * metadata - {@link Metadata} for this file. 128 * </li> 129 * <li> 130 * sync - A sync marker to denote end of the header. 131 * </li> 132 * </ul> 133 * 134 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 135 * <ul> 136 * <li> 137 * <a href="#Header">Header</a> 138 * </li> 139 * <li> 140 * Record 141 * <ul> 142 * <li>Record length</li> 143 * <li>Key length</li> 144 * <li>Key</li> 145 * <li>Value</li> 146 * </ul> 147 * </li> 148 * <li> 149 * A sync-marker every few <code>100</code> bytes or so. 150 * </li> 151 * </ul> 152 * 153 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 154 * <ul> 155 * <li> 156 * <a href="#Header">Header</a> 157 * </li> 158 * <li> 159 * Record 160 * <ul> 161 * <li>Record length</li> 162 * <li>Key length</li> 163 * <li>Key</li> 164 * <li><i>Compressed</i> Value</li> 165 * </ul> 166 * </li> 167 * <li> 168 * A sync-marker every few <code>100</code> bytes or so. 169 * </li> 170 * </ul> 171 * 172 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 173 * <ul> 174 * <li> 175 * <a href="#Header">Header</a> 176 * </li> 177 * <li> 178 * Record <i>Block</i> 179 * <ul> 180 * <li>Uncompressed number of records in the block</li> 181 * <li>Compressed key-lengths block-size</li> 182 * <li>Compressed key-lengths block</li> 183 * <li>Compressed keys block-size</li> 184 * <li>Compressed keys block</li> 185 * <li>Compressed value-lengths block-size</li> 186 * <li>Compressed value-lengths block</li> 187 * <li>Compressed values block-size</li> 188 * <li>Compressed values block</li> 189 * </ul> 190 * </li> 191 * <li> 192 * A sync-marker every block. 193 * </li> 194 * </ul> 195 * 196 * <p>The compressed blocks of key lengths and value lengths consist of the 197 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 198 * format.</p> 199 * 200 * @see CompressionCodec 201 */ 202@InterfaceAudience.Public 203@InterfaceStability.Stable 204public class SequenceFile { 205 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 206 207 private SequenceFile() {} // no public ctor 208 209 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 210 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 211 private static final byte VERSION_WITH_METADATA = (byte)6; 212 private static byte[] VERSION = new byte[] { 213 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 214 }; 215 216 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 217 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 218 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 219 220 /** The number of bytes between sync points.*/ 221 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 222 223 /** 224 * The compression type used to compress key/value pairs in the 225 * {@link SequenceFile}. 226 * 227 * @see SequenceFile.Writer 228 */ 229 public static enum CompressionType { 230 /** Do not compress records. */ 231 NONE, 232 /** Compress values only, each separately. */ 233 RECORD, 234 /** Compress sequences of records together in blocks. */ 235 BLOCK 236 } 237 238 /** 239 * Get the compression type for the reduce outputs 240 * @param job the job config to look in 241 * @return the kind of compression to use 242 */ 243 static public CompressionType getDefaultCompressionType(Configuration job) { 244 String name = job.get("io.seqfile.compression.type"); 245 return name == null ? CompressionType.RECORD : 246 CompressionType.valueOf(name); 247 } 248 249 /** 250 * Set the default compression type for sequence files. 251 * @param job the configuration to modify 252 * @param val the new compression type (none, block, record) 253 */ 254 static public void setDefaultCompressionType(Configuration job, 255 CompressionType val) { 256 job.set("io.seqfile.compression.type", val.toString()); 257 } 258 259 /** 260 * Create a new Writer with the given options. 261 * @param conf the configuration to use 262 * @param opts the options to create the file with 263 * @return a new Writer 264 * @throws IOException 265 */ 266 public static Writer createWriter(Configuration conf, Writer.Option... opts 267 ) throws IOException { 268 Writer.CompressionOption compressionOption = 269 Options.getOption(Writer.CompressionOption.class, opts); 270 CompressionType kind; 271 if (compressionOption != null) { 272 kind = compressionOption.getValue(); 273 } else { 274 kind = getDefaultCompressionType(conf); 275 opts = Options.prependOptions(opts, Writer.compression(kind)); 276 } 277 switch (kind) { 278 default: 279 case NONE: 280 return new Writer(conf, opts); 281 case RECORD: 282 return new RecordCompressWriter(conf, opts); 283 case BLOCK: 284 return new BlockCompressWriter(conf, opts); 285 } 286 } 287 288 /** 289 * Construct the preferred type of SequenceFile Writer. 290 * @param fs The configured filesystem. 291 * @param conf The configuration. 292 * @param name The name of the file. 293 * @param keyClass The 'key' type. 294 * @param valClass The 'value' type. 295 * @return Returns the handle to the constructed SequenceFile Writer. 296 * @throws IOException 297 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 298 * instead. 299 */ 300 @Deprecated 301 public static Writer 302 createWriter(FileSystem fs, Configuration conf, Path name, 303 Class keyClass, Class valClass) throws IOException { 304 return createWriter(conf, Writer.filesystem(fs), 305 Writer.file(name), Writer.keyClass(keyClass), 306 Writer.valueClass(valClass)); 307 } 308 309 /** 310 * Construct the preferred type of SequenceFile Writer. 311 * @param fs The configured filesystem. 312 * @param conf The configuration. 313 * @param name The name of the file. 314 * @param keyClass The 'key' type. 315 * @param valClass The 'value' type. 316 * @param compressionType The compression type. 317 * @return Returns the handle to the constructed SequenceFile Writer. 318 * @throws IOException 319 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 320 * instead. 321 */ 322 @Deprecated 323 public static Writer 324 createWriter(FileSystem fs, Configuration conf, Path name, 325 Class keyClass, Class valClass, 326 CompressionType compressionType) throws IOException { 327 return createWriter(conf, Writer.filesystem(fs), 328 Writer.file(name), Writer.keyClass(keyClass), 329 Writer.valueClass(valClass), 330 Writer.compression(compressionType)); 331 } 332 333 /** 334 * Construct the preferred type of SequenceFile Writer. 335 * @param fs The configured filesystem. 336 * @param conf The configuration. 337 * @param name The name of the file. 338 * @param keyClass The 'key' type. 339 * @param valClass The 'value' type. 340 * @param compressionType The compression type. 341 * @param progress The Progressable object to track progress. 342 * @return Returns the handle to the constructed SequenceFile Writer. 343 * @throws IOException 344 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 345 * instead. 346 */ 347 @Deprecated 348 public static Writer 349 createWriter(FileSystem fs, Configuration conf, Path name, 350 Class keyClass, Class valClass, CompressionType compressionType, 351 Progressable progress) throws IOException { 352 return createWriter(conf, Writer.file(name), 353 Writer.filesystem(fs), 354 Writer.keyClass(keyClass), 355 Writer.valueClass(valClass), 356 Writer.compression(compressionType), 357 Writer.progressable(progress)); 358 } 359 360 /** 361 * Construct the preferred type of SequenceFile Writer. 362 * @param fs The configured filesystem. 363 * @param conf The configuration. 364 * @param name The name of the file. 365 * @param keyClass The 'key' type. 366 * @param valClass The 'value' type. 367 * @param compressionType The compression type. 368 * @param codec The compression codec. 369 * @return Returns the handle to the constructed SequenceFile Writer. 370 * @throws IOException 371 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 372 * instead. 373 */ 374 @Deprecated 375 public static Writer 376 createWriter(FileSystem fs, Configuration conf, Path name, 377 Class keyClass, Class valClass, CompressionType compressionType, 378 CompressionCodec codec) throws IOException { 379 return createWriter(conf, Writer.file(name), 380 Writer.filesystem(fs), 381 Writer.keyClass(keyClass), 382 Writer.valueClass(valClass), 383 Writer.compression(compressionType, codec)); 384 } 385 386 /** 387 * Construct the preferred type of SequenceFile Writer. 388 * @param fs The configured filesystem. 389 * @param conf The configuration. 390 * @param name The name of the file. 391 * @param keyClass The 'key' type. 392 * @param valClass The 'value' type. 393 * @param compressionType The compression type. 394 * @param codec The compression codec. 395 * @param progress The Progressable object to track progress. 396 * @param metadata The metadata of the file. 397 * @return Returns the handle to the constructed SequenceFile Writer. 398 * @throws IOException 399 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 400 * instead. 401 */ 402 @Deprecated 403 public static Writer 404 createWriter(FileSystem fs, Configuration conf, Path name, 405 Class keyClass, Class valClass, 406 CompressionType compressionType, CompressionCodec codec, 407 Progressable progress, Metadata metadata) throws IOException { 408 return createWriter(conf, Writer.file(name), 409 Writer.filesystem(fs), 410 Writer.keyClass(keyClass), 411 Writer.valueClass(valClass), 412 Writer.compression(compressionType, codec), 413 Writer.progressable(progress), 414 Writer.metadata(metadata)); 415 } 416 417 /** 418 * Construct the preferred type of SequenceFile Writer. 419 * @param fs The configured filesystem. 420 * @param conf The configuration. 421 * @param name The name of the file. 422 * @param keyClass The 'key' type. 423 * @param valClass The 'value' type. 424 * @param bufferSize buffer size for the underlaying outputstream. 425 * @param replication replication factor for the file. 426 * @param blockSize block size for the file. 427 * @param compressionType The compression type. 428 * @param codec The compression codec. 429 * @param progress The Progressable object to track progress. 430 * @param metadata The metadata of the file. 431 * @return Returns the handle to the constructed SequenceFile Writer. 432 * @throws IOException 433 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 434 * instead. 435 */ 436 @Deprecated 437 public static Writer 438 createWriter(FileSystem fs, Configuration conf, Path name, 439 Class keyClass, Class valClass, int bufferSize, 440 short replication, long blockSize, 441 CompressionType compressionType, CompressionCodec codec, 442 Progressable progress, Metadata metadata) throws IOException { 443 return createWriter(conf, Writer.file(name), 444 Writer.filesystem(fs), 445 Writer.keyClass(keyClass), 446 Writer.valueClass(valClass), 447 Writer.bufferSize(bufferSize), 448 Writer.replication(replication), 449 Writer.blockSize(blockSize), 450 Writer.compression(compressionType, codec), 451 Writer.progressable(progress), 452 Writer.metadata(metadata)); 453 } 454 455 /** 456 * Construct the preferred type of SequenceFile Writer. 457 * @param fs The configured filesystem. 458 * @param conf The configuration. 459 * @param name The name of the file. 460 * @param keyClass The 'key' type. 461 * @param valClass The 'value' type. 462 * @param bufferSize buffer size for the underlaying outputstream. 463 * @param replication replication factor for the file. 464 * @param blockSize block size for the file. 465 * @param createParent create parent directory if non-existent 466 * @param compressionType The compression type. 467 * @param codec The compression codec. 468 * @param metadata The metadata of the file. 469 * @return Returns the handle to the constructed SequenceFile Writer. 470 * @throws IOException 471 */ 472 @Deprecated 473 public static Writer 474 createWriter(FileSystem fs, Configuration conf, Path name, 475 Class keyClass, Class valClass, int bufferSize, 476 short replication, long blockSize, boolean createParent, 477 CompressionType compressionType, CompressionCodec codec, 478 Metadata metadata) throws IOException { 479 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 480 conf, name, keyClass, valClass, compressionType, codec, 481 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 482 CreateOpts.bufferSize(bufferSize), 483 createParent ? CreateOpts.createParent() 484 : CreateOpts.donotCreateParent(), 485 CreateOpts.repFac(replication), 486 CreateOpts.blockSize(blockSize) 487 ); 488 } 489 490 /** 491 * Construct the preferred type of SequenceFile Writer. 492 * @param fc The context for the specified file. 493 * @param conf The configuration. 494 * @param name The name of the file. 495 * @param keyClass The 'key' type. 496 * @param valClass The 'value' type. 497 * @param compressionType The compression type. 498 * @param codec The compression codec. 499 * @param metadata The metadata of the file. 500 * @param createFlag gives the semantics of create: overwrite, append etc. 501 * @param opts file creation options; see {@link CreateOpts}. 502 * @return Returns the handle to the constructed SequenceFile Writer. 503 * @throws IOException 504 */ 505 public static Writer 506 createWriter(FileContext fc, Configuration conf, Path name, 507 Class keyClass, Class valClass, 508 CompressionType compressionType, CompressionCodec codec, 509 Metadata metadata, 510 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 511 throws IOException { 512 return createWriter(conf, fc.create(name, createFlag, opts), 513 keyClass, valClass, compressionType, codec, metadata).ownStream(); 514 } 515 516 /** 517 * Construct the preferred type of SequenceFile Writer. 518 * @param fs The configured filesystem. 519 * @param conf The configuration. 520 * @param name The name of the file. 521 * @param keyClass The 'key' type. 522 * @param valClass The 'value' type. 523 * @param compressionType The compression type. 524 * @param codec The compression codec. 525 * @param progress The Progressable object to track progress. 526 * @return Returns the handle to the constructed SequenceFile Writer. 527 * @throws IOException 528 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 529 * instead. 530 */ 531 @Deprecated 532 public static Writer 533 createWriter(FileSystem fs, Configuration conf, Path name, 534 Class keyClass, Class valClass, 535 CompressionType compressionType, CompressionCodec codec, 536 Progressable progress) throws IOException { 537 return createWriter(conf, Writer.file(name), 538 Writer.filesystem(fs), 539 Writer.keyClass(keyClass), 540 Writer.valueClass(valClass), 541 Writer.compression(compressionType, codec), 542 Writer.progressable(progress)); 543 } 544 545 /** 546 * Construct the preferred type of 'raw' SequenceFile Writer. 547 * @param conf The configuration. 548 * @param out The stream on top which the writer is to be constructed. 549 * @param keyClass The 'key' type. 550 * @param valClass The 'value' type. 551 * @param compressionType The compression type. 552 * @param codec The compression codec. 553 * @param metadata The metadata of the file. 554 * @return Returns the handle to the constructed SequenceFile Writer. 555 * @throws IOException 556 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 557 * instead. 558 */ 559 @Deprecated 560 public static Writer 561 createWriter(Configuration conf, FSDataOutputStream out, 562 Class keyClass, Class valClass, 563 CompressionType compressionType, 564 CompressionCodec codec, Metadata metadata) throws IOException { 565 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 566 Writer.valueClass(valClass), 567 Writer.compression(compressionType, codec), 568 Writer.metadata(metadata)); 569 } 570 571 /** 572 * Construct the preferred type of 'raw' SequenceFile Writer. 573 * @param conf The configuration. 574 * @param out The stream on top which the writer is to be constructed. 575 * @param keyClass The 'key' type. 576 * @param valClass The 'value' type. 577 * @param compressionType The compression type. 578 * @param codec The compression codec. 579 * @return Returns the handle to the constructed SequenceFile Writer. 580 * @throws IOException 581 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 582 * instead. 583 */ 584 @Deprecated 585 public static Writer 586 createWriter(Configuration conf, FSDataOutputStream out, 587 Class keyClass, Class valClass, CompressionType compressionType, 588 CompressionCodec codec) throws IOException { 589 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 590 Writer.valueClass(valClass), 591 Writer.compression(compressionType, codec)); 592 } 593 594 595 /** The interface to 'raw' values of SequenceFiles. */ 596 public static interface ValueBytes { 597 598 /** Writes the uncompressed bytes to the outStream. 599 * @param outStream : Stream to write uncompressed bytes into. 600 * @throws IOException 601 */ 602 public void writeUncompressedBytes(DataOutputStream outStream) 603 throws IOException; 604 605 /** Write compressed bytes to outStream. 606 * Note: that it will NOT compress the bytes if they are not compressed. 607 * @param outStream : Stream to write compressed bytes into. 608 */ 609 public void writeCompressedBytes(DataOutputStream outStream) 610 throws IllegalArgumentException, IOException; 611 612 /** 613 * Size of stored data. 614 */ 615 public int getSize(); 616 } 617 618 private static class UncompressedBytes implements ValueBytes { 619 private int dataSize; 620 private byte[] data; 621 622 private UncompressedBytes() { 623 data = null; 624 dataSize = 0; 625 } 626 627 private void reset(DataInputStream in, int length) throws IOException { 628 if (data == null) { 629 data = new byte[length]; 630 } else if (length > data.length) { 631 data = new byte[Math.max(length, data.length * 2)]; 632 } 633 dataSize = -1; 634 in.readFully(data, 0, length); 635 dataSize = length; 636 } 637 638 @Override 639 public int getSize() { 640 return dataSize; 641 } 642 643 @Override 644 public void writeUncompressedBytes(DataOutputStream outStream) 645 throws IOException { 646 outStream.write(data, 0, dataSize); 647 } 648 649 @Override 650 public void writeCompressedBytes(DataOutputStream outStream) 651 throws IllegalArgumentException, IOException { 652 throw 653 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 654 } 655 656 } // UncompressedBytes 657 658 private static class CompressedBytes implements ValueBytes { 659 private int dataSize; 660 private byte[] data; 661 DataInputBuffer rawData = null; 662 CompressionCodec codec = null; 663 CompressionInputStream decompressedStream = null; 664 665 private CompressedBytes(CompressionCodec codec) { 666 data = null; 667 dataSize = 0; 668 this.codec = codec; 669 } 670 671 private void reset(DataInputStream in, int length) throws IOException { 672 if (data == null) { 673 data = new byte[length]; 674 } else if (length > data.length) { 675 data = new byte[Math.max(length, data.length * 2)]; 676 } 677 dataSize = -1; 678 in.readFully(data, 0, length); 679 dataSize = length; 680 } 681 682 @Override 683 public int getSize() { 684 return dataSize; 685 } 686 687 @Override 688 public void writeUncompressedBytes(DataOutputStream outStream) 689 throws IOException { 690 if (decompressedStream == null) { 691 rawData = new DataInputBuffer(); 692 decompressedStream = codec.createInputStream(rawData); 693 } else { 694 decompressedStream.resetState(); 695 } 696 rawData.reset(data, 0, dataSize); 697 698 byte[] buffer = new byte[8192]; 699 int bytesRead = 0; 700 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 701 outStream.write(buffer, 0, bytesRead); 702 } 703 } 704 705 @Override 706 public void writeCompressedBytes(DataOutputStream outStream) 707 throws IllegalArgumentException, IOException { 708 outStream.write(data, 0, dataSize); 709 } 710 711 } // CompressedBytes 712 713 /** 714 * The class encapsulating with the metadata of a file. 715 * The metadata of a file is a list of attribute name/value 716 * pairs of Text type. 717 * 718 */ 719 public static class Metadata implements Writable { 720 721 private TreeMap<Text, Text> theMetadata; 722 723 public Metadata() { 724 this(new TreeMap<Text, Text>()); 725 } 726 727 public Metadata(TreeMap<Text, Text> arg) { 728 if (arg == null) { 729 this.theMetadata = new TreeMap<Text, Text>(); 730 } else { 731 this.theMetadata = arg; 732 } 733 } 734 735 public Text get(Text name) { 736 return this.theMetadata.get(name); 737 } 738 739 public void set(Text name, Text value) { 740 this.theMetadata.put(name, value); 741 } 742 743 public TreeMap<Text, Text> getMetadata() { 744 return new TreeMap<Text, Text>(this.theMetadata); 745 } 746 747 @Override 748 public void write(DataOutput out) throws IOException { 749 out.writeInt(this.theMetadata.size()); 750 Iterator<Map.Entry<Text, Text>> iter = 751 this.theMetadata.entrySet().iterator(); 752 while (iter.hasNext()) { 753 Map.Entry<Text, Text> en = iter.next(); 754 en.getKey().write(out); 755 en.getValue().write(out); 756 } 757 } 758 759 @Override 760 public void readFields(DataInput in) throws IOException { 761 int sz = in.readInt(); 762 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 763 this.theMetadata = new TreeMap<Text, Text>(); 764 for (int i = 0; i < sz; i++) { 765 Text key = new Text(); 766 Text val = new Text(); 767 key.readFields(in); 768 val.readFields(in); 769 this.theMetadata.put(key, val); 770 } 771 } 772 773 @Override 774 public boolean equals(Object other) { 775 if (other == null) { 776 return false; 777 } 778 if (other.getClass() != this.getClass()) { 779 return false; 780 } else { 781 return equals((Metadata)other); 782 } 783 } 784 785 public boolean equals(Metadata other) { 786 if (other == null) return false; 787 if (this.theMetadata.size() != other.theMetadata.size()) { 788 return false; 789 } 790 Iterator<Map.Entry<Text, Text>> iter1 = 791 this.theMetadata.entrySet().iterator(); 792 Iterator<Map.Entry<Text, Text>> iter2 = 793 other.theMetadata.entrySet().iterator(); 794 while (iter1.hasNext() && iter2.hasNext()) { 795 Map.Entry<Text, Text> en1 = iter1.next(); 796 Map.Entry<Text, Text> en2 = iter2.next(); 797 if (!en1.getKey().equals(en2.getKey())) { 798 return false; 799 } 800 if (!en1.getValue().equals(en2.getValue())) { 801 return false; 802 } 803 } 804 if (iter1.hasNext() || iter2.hasNext()) { 805 return false; 806 } 807 return true; 808 } 809 810 @Override 811 public int hashCode() { 812 assert false : "hashCode not designed"; 813 return 42; // any arbitrary constant will do 814 } 815 816 @Override 817 public String toString() { 818 StringBuilder sb = new StringBuilder(); 819 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 820 Iterator<Map.Entry<Text, Text>> iter = 821 this.theMetadata.entrySet().iterator(); 822 while (iter.hasNext()) { 823 Map.Entry<Text, Text> en = iter.next(); 824 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 825 sb.append("\n"); 826 } 827 return sb.toString(); 828 } 829 } 830 831 /** Write key/value pairs to a sequence-format file. */ 832 public static class Writer implements java.io.Closeable, Syncable { 833 private Configuration conf; 834 FSDataOutputStream out; 835 boolean ownOutputStream = true; 836 DataOutputBuffer buffer = new DataOutputBuffer(); 837 838 Class keyClass; 839 Class valClass; 840 841 private final CompressionType compress; 842 CompressionCodec codec = null; 843 CompressionOutputStream deflateFilter = null; 844 DataOutputStream deflateOut = null; 845 Metadata metadata = null; 846 Compressor compressor = null; 847 848 private boolean appendMode = false; 849 850 protected Serializer keySerializer; 851 protected Serializer uncompressedValSerializer; 852 protected Serializer compressedValSerializer; 853 854 // Insert a globally unique 16-byte value every few entries, so that one 855 // can seek into the middle of a file and then synchronize with record 856 // starts and ends by scanning for this value. 857 long lastSyncPos; // position of last sync 858 byte[] sync; // 16 random bytes 859 { 860 try { 861 MessageDigest digester = MessageDigest.getInstance("MD5"); 862 long time = Time.now(); 863 digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8)); 864 sync = digester.digest(); 865 } catch (Exception e) { 866 throw new RuntimeException(e); 867 } 868 } 869 870 public static interface Option {} 871 872 static class FileOption extends Options.PathOption 873 implements Option { 874 FileOption(Path path) { 875 super(path); 876 } 877 } 878 879 /** 880 * @deprecated only used for backwards-compatibility in the createWriter methods 881 * that take FileSystem. 882 */ 883 @Deprecated 884 private static class FileSystemOption implements Option { 885 private final FileSystem value; 886 protected FileSystemOption(FileSystem value) { 887 this.value = value; 888 } 889 public FileSystem getValue() { 890 return value; 891 } 892 } 893 894 static class StreamOption extends Options.FSDataOutputStreamOption 895 implements Option { 896 StreamOption(FSDataOutputStream stream) { 897 super(stream); 898 } 899 } 900 901 static class BufferSizeOption extends Options.IntegerOption 902 implements Option { 903 BufferSizeOption(int value) { 904 super(value); 905 } 906 } 907 908 static class BlockSizeOption extends Options.LongOption implements Option { 909 BlockSizeOption(long value) { 910 super(value); 911 } 912 } 913 914 static class ReplicationOption extends Options.IntegerOption 915 implements Option { 916 ReplicationOption(int value) { 917 super(value); 918 } 919 } 920 921 static class AppendIfExistsOption extends Options.BooleanOption implements 922 Option { 923 AppendIfExistsOption(boolean value) { 924 super(value); 925 } 926 } 927 928 static class KeyClassOption extends Options.ClassOption implements Option { 929 KeyClassOption(Class<?> value) { 930 super(value); 931 } 932 } 933 934 static class ValueClassOption extends Options.ClassOption 935 implements Option { 936 ValueClassOption(Class<?> value) { 937 super(value); 938 } 939 } 940 941 static class MetadataOption implements Option { 942 private final Metadata value; 943 MetadataOption(Metadata value) { 944 this.value = value; 945 } 946 Metadata getValue() { 947 return value; 948 } 949 } 950 951 static class ProgressableOption extends Options.ProgressableOption 952 implements Option { 953 ProgressableOption(Progressable value) { 954 super(value); 955 } 956 } 957 958 private static class CompressionOption implements Option { 959 private final CompressionType value; 960 private final CompressionCodec codec; 961 CompressionOption(CompressionType value) { 962 this(value, null); 963 } 964 CompressionOption(CompressionType value, CompressionCodec codec) { 965 this.value = value; 966 this.codec = (CompressionType.NONE != value && null == codec) 967 ? new DefaultCodec() 968 : codec; 969 } 970 CompressionType getValue() { 971 return value; 972 } 973 CompressionCodec getCodec() { 974 return codec; 975 } 976 } 977 978 public static Option file(Path value) { 979 return new FileOption(value); 980 } 981 982 /** 983 * @deprecated only used for backwards-compatibility in the createWriter methods 984 * that take FileSystem. 985 */ 986 @Deprecated 987 private static Option filesystem(FileSystem fs) { 988 return new SequenceFile.Writer.FileSystemOption(fs); 989 } 990 991 public static Option bufferSize(int value) { 992 return new BufferSizeOption(value); 993 } 994 995 public static Option stream(FSDataOutputStream value) { 996 return new StreamOption(value); 997 } 998 999 public static Option replication(short value) { 1000 return new ReplicationOption(value); 1001 } 1002 1003 public static Option appendIfExists(boolean value) { 1004 return new AppendIfExistsOption(value); 1005 } 1006 1007 public static Option blockSize(long value) { 1008 return new BlockSizeOption(value); 1009 } 1010 1011 public static Option progressable(Progressable value) { 1012 return new ProgressableOption(value); 1013 } 1014 1015 public static Option keyClass(Class<?> value) { 1016 return new KeyClassOption(value); 1017 } 1018 1019 public static Option valueClass(Class<?> value) { 1020 return new ValueClassOption(value); 1021 } 1022 1023 public static Option metadata(Metadata value) { 1024 return new MetadataOption(value); 1025 } 1026 1027 public static Option compression(CompressionType value) { 1028 return new CompressionOption(value); 1029 } 1030 1031 public static Option compression(CompressionType value, 1032 CompressionCodec codec) { 1033 return new CompressionOption(value, codec); 1034 } 1035 1036 /** 1037 * Construct a uncompressed writer from a set of options. 1038 * @param conf the configuration to use 1039 * @param options the options used when creating the writer 1040 * @throws IOException if it fails 1041 */ 1042 Writer(Configuration conf, 1043 Option... opts) throws IOException { 1044 BlockSizeOption blockSizeOption = 1045 Options.getOption(BlockSizeOption.class, opts); 1046 BufferSizeOption bufferSizeOption = 1047 Options.getOption(BufferSizeOption.class, opts); 1048 ReplicationOption replicationOption = 1049 Options.getOption(ReplicationOption.class, opts); 1050 ProgressableOption progressOption = 1051 Options.getOption(ProgressableOption.class, opts); 1052 FileOption fileOption = Options.getOption(FileOption.class, opts); 1053 AppendIfExistsOption appendIfExistsOption = Options.getOption( 1054 AppendIfExistsOption.class, opts); 1055 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1056 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1057 KeyClassOption keyClassOption = 1058 Options.getOption(KeyClassOption.class, opts); 1059 ValueClassOption valueClassOption = 1060 Options.getOption(ValueClassOption.class, opts); 1061 MetadataOption metadataOption = 1062 Options.getOption(MetadataOption.class, opts); 1063 CompressionOption compressionTypeOption = 1064 Options.getOption(CompressionOption.class, opts); 1065 // check consistency of options 1066 if ((fileOption == null) == (streamOption == null)) { 1067 throw new IllegalArgumentException("file or stream must be specified"); 1068 } 1069 if (fileOption == null && (blockSizeOption != null || 1070 bufferSizeOption != null || 1071 replicationOption != null || 1072 progressOption != null)) { 1073 throw new IllegalArgumentException("file modifier options not " + 1074 "compatible with stream"); 1075 } 1076 1077 FSDataOutputStream out; 1078 boolean ownStream = fileOption != null; 1079 if (ownStream) { 1080 Path p = fileOption.getValue(); 1081 FileSystem fs; 1082 if (fsOption != null) { 1083 fs = fsOption.getValue(); 1084 } else { 1085 fs = p.getFileSystem(conf); 1086 } 1087 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1088 bufferSizeOption.getValue(); 1089 short replication = replicationOption == null ? 1090 fs.getDefaultReplication(p) : 1091 (short) replicationOption.getValue(); 1092 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1093 blockSizeOption.getValue(); 1094 Progressable progress = progressOption == null ? null : 1095 progressOption.getValue(); 1096 1097 if (appendIfExistsOption != null && appendIfExistsOption.getValue() 1098 && fs.exists(p)) { 1099 1100 // Read the file and verify header details 1101 SequenceFile.Reader reader = new SequenceFile.Reader(conf, 1102 SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption()); 1103 try { 1104 1105 if (keyClassOption.getValue() != reader.getKeyClass() 1106 || valueClassOption.getValue() != reader.getValueClass()) { 1107 throw new IllegalArgumentException( 1108 "Key/value class provided does not match the file"); 1109 } 1110 1111 if (reader.getVersion() != VERSION[3]) { 1112 throw new VersionMismatchException(VERSION[3], 1113 reader.getVersion()); 1114 } 1115 1116 if (metadataOption != null) { 1117 LOG.info("MetaData Option is ignored during append"); 1118 } 1119 metadataOption = (MetadataOption) SequenceFile.Writer 1120 .metadata(reader.getMetadata()); 1121 1122 CompressionOption readerCompressionOption = new CompressionOption( 1123 reader.getCompressionType(), reader.getCompressionCodec()); 1124 1125 // Codec comparison will be ignored if the compression is NONE 1126 if (readerCompressionOption.value != compressionTypeOption.value 1127 || (readerCompressionOption.value != CompressionType.NONE 1128 && readerCompressionOption.codec 1129 .getClass() != compressionTypeOption.codec 1130 .getClass())) { 1131 throw new IllegalArgumentException( 1132 "Compression option provided does not match the file"); 1133 } 1134 1135 sync = reader.getSync(); 1136 1137 } finally { 1138 reader.close(); 1139 } 1140 1141 out = fs.append(p, bufferSize, progress); 1142 this.appendMode = true; 1143 } else { 1144 out = fs 1145 .create(p, true, bufferSize, replication, blockSize, progress); 1146 } 1147 } else { 1148 out = streamOption.getValue(); 1149 } 1150 Class<?> keyClass = keyClassOption == null ? 1151 Object.class : keyClassOption.getValue(); 1152 Class<?> valueClass = valueClassOption == null ? 1153 Object.class : valueClassOption.getValue(); 1154 Metadata metadata = metadataOption == null ? 1155 new Metadata() : metadataOption.getValue(); 1156 this.compress = compressionTypeOption.getValue(); 1157 final CompressionCodec codec = compressionTypeOption.getCodec(); 1158 if (codec != null && 1159 (codec instanceof GzipCodec) && 1160 !NativeCodeLoader.isNativeCodeLoaded() && 1161 !ZlibFactory.isNativeZlibLoaded(conf)) { 1162 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1163 "GzipCodec without native-hadoop " + 1164 "code!"); 1165 } 1166 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1167 } 1168 1169 /** Create the named file. 1170 * @deprecated Use 1171 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1172 * instead. 1173 */ 1174 @Deprecated 1175 public Writer(FileSystem fs, Configuration conf, Path name, 1176 Class keyClass, Class valClass) throws IOException { 1177 this.compress = CompressionType.NONE; 1178 init(conf, fs.create(name), true, keyClass, valClass, null, 1179 new Metadata()); 1180 } 1181 1182 /** Create the named file with write-progress reporter. 1183 * @deprecated Use 1184 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1185 * instead. 1186 */ 1187 @Deprecated 1188 public Writer(FileSystem fs, Configuration conf, Path name, 1189 Class keyClass, Class valClass, 1190 Progressable progress, Metadata metadata) throws IOException { 1191 this.compress = CompressionType.NONE; 1192 init(conf, fs.create(name, progress), true, keyClass, valClass, 1193 null, metadata); 1194 } 1195 1196 /** Create the named file with write-progress reporter. 1197 * @deprecated Use 1198 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1199 * instead. 1200 */ 1201 @Deprecated 1202 public Writer(FileSystem fs, Configuration conf, Path name, 1203 Class keyClass, Class valClass, 1204 int bufferSize, short replication, long blockSize, 1205 Progressable progress, Metadata metadata) throws IOException { 1206 this.compress = CompressionType.NONE; 1207 init(conf, 1208 fs.create(name, true, bufferSize, replication, blockSize, progress), 1209 true, keyClass, valClass, null, metadata); 1210 } 1211 1212 boolean isCompressed() { return compress != CompressionType.NONE; } 1213 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1214 1215 Writer ownStream() { this.ownOutputStream = true; return this; } 1216 1217 /** Write and flush the file header. */ 1218 private void writeFileHeader() 1219 throws IOException { 1220 out.write(VERSION); 1221 Text.writeString(out, keyClass.getName()); 1222 Text.writeString(out, valClass.getName()); 1223 1224 out.writeBoolean(this.isCompressed()); 1225 out.writeBoolean(this.isBlockCompressed()); 1226 1227 if (this.isCompressed()) { 1228 Text.writeString(out, (codec.getClass()).getName()); 1229 } 1230 this.metadata.write(out); 1231 out.write(sync); // write the sync bytes 1232 out.flush(); // flush header 1233 } 1234 1235 /** Initialize. */ 1236 @SuppressWarnings("unchecked") 1237 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1238 Class keyClass, Class valClass, 1239 CompressionCodec codec, Metadata metadata) 1240 throws IOException { 1241 this.conf = conf; 1242 this.out = out; 1243 this.ownOutputStream = ownStream; 1244 this.keyClass = keyClass; 1245 this.valClass = valClass; 1246 this.codec = codec; 1247 this.metadata = metadata; 1248 SerializationFactory serializationFactory = new SerializationFactory(conf); 1249 this.keySerializer = serializationFactory.getSerializer(keyClass); 1250 if (this.keySerializer == null) { 1251 throw new IOException( 1252 "Could not find a serializer for the Key class: '" 1253 + keyClass.getCanonicalName() + "'. " 1254 + "Please ensure that the configuration '" + 1255 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1256 + "properly configured, if you're using" 1257 + "custom serialization."); 1258 } 1259 this.keySerializer.open(buffer); 1260 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1261 if (this.uncompressedValSerializer == null) { 1262 throw new IOException( 1263 "Could not find a serializer for the Value class: '" 1264 + valClass.getCanonicalName() + "'. " 1265 + "Please ensure that the configuration '" + 1266 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1267 + "properly configured, if you're using" 1268 + "custom serialization."); 1269 } 1270 this.uncompressedValSerializer.open(buffer); 1271 if (this.codec != null) { 1272 ReflectionUtils.setConf(this.codec, this.conf); 1273 this.compressor = CodecPool.getCompressor(this.codec); 1274 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1275 this.deflateOut = 1276 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1277 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1278 if (this.compressedValSerializer == null) { 1279 throw new IOException( 1280 "Could not find a serializer for the Value class: '" 1281 + valClass.getCanonicalName() + "'. " 1282 + "Please ensure that the configuration '" + 1283 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 1284 + "properly configured, if you're using" 1285 + "custom serialization."); 1286 } 1287 this.compressedValSerializer.open(deflateOut); 1288 } 1289 1290 if (appendMode) { 1291 sync(); 1292 } else { 1293 writeFileHeader(); 1294 } 1295 } 1296 1297 /** Returns the class of keys in this file. */ 1298 public Class getKeyClass() { return keyClass; } 1299 1300 /** Returns the class of values in this file. */ 1301 public Class getValueClass() { return valClass; } 1302 1303 /** Returns the compression codec of data in this file. */ 1304 public CompressionCodec getCompressionCodec() { return codec; } 1305 1306 /** create a sync point */ 1307 public void sync() throws IOException { 1308 if (sync != null && lastSyncPos != out.getPos()) { 1309 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1310 out.write(sync); // write sync 1311 lastSyncPos = out.getPos(); // update lastSyncPos 1312 } 1313 } 1314 1315 /** 1316 * flush all currently written data to the file system 1317 * @deprecated Use {@link #hsync()} or {@link #hflush()} instead 1318 */ 1319 @Deprecated 1320 public void syncFs() throws IOException { 1321 if (out != null) { 1322 out.sync(); // flush contents to file system 1323 } 1324 } 1325 1326 @Override 1327 public void hsync() throws IOException { 1328 if (out != null) { 1329 out.hsync(); 1330 } 1331 } 1332 1333 @Override 1334 public void hflush() throws IOException { 1335 if (out != null) { 1336 out.hflush(); 1337 } 1338 } 1339 1340 /** Returns the configuration of this file. */ 1341 Configuration getConf() { return conf; } 1342 1343 /** Close the file. */ 1344 @Override 1345 public synchronized void close() throws IOException { 1346 keySerializer.close(); 1347 uncompressedValSerializer.close(); 1348 if (compressedValSerializer != null) { 1349 compressedValSerializer.close(); 1350 } 1351 1352 CodecPool.returnCompressor(compressor); 1353 compressor = null; 1354 1355 if (out != null) { 1356 1357 // Close the underlying stream iff we own it... 1358 if (ownOutputStream) { 1359 out.close(); 1360 } else { 1361 out.flush(); 1362 } 1363 out = null; 1364 } 1365 } 1366 1367 synchronized void checkAndWriteSync() throws IOException { 1368 if (sync != null && 1369 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1370 sync(); 1371 } 1372 } 1373 1374 /** Append a key/value pair. */ 1375 public void append(Writable key, Writable val) 1376 throws IOException { 1377 append((Object) key, (Object) val); 1378 } 1379 1380 /** Append a key/value pair. */ 1381 @SuppressWarnings("unchecked") 1382 public synchronized void append(Object key, Object val) 1383 throws IOException { 1384 if (key.getClass() != keyClass) 1385 throw new IOException("wrong key class: "+key.getClass().getName() 1386 +" is not "+keyClass); 1387 if (val.getClass() != valClass) 1388 throw new IOException("wrong value class: "+val.getClass().getName() 1389 +" is not "+valClass); 1390 1391 buffer.reset(); 1392 1393 // Append the 'key' 1394 keySerializer.serialize(key); 1395 int keyLength = buffer.getLength(); 1396 if (keyLength < 0) 1397 throw new IOException("negative length keys not allowed: " + key); 1398 1399 // Append the 'value' 1400 if (compress == CompressionType.RECORD) { 1401 deflateFilter.resetState(); 1402 compressedValSerializer.serialize(val); 1403 deflateOut.flush(); 1404 deflateFilter.finish(); 1405 } else { 1406 uncompressedValSerializer.serialize(val); 1407 } 1408 1409 // Write the record out 1410 checkAndWriteSync(); // sync 1411 out.writeInt(buffer.getLength()); // total record length 1412 out.writeInt(keyLength); // key portion length 1413 out.write(buffer.getData(), 0, buffer.getLength()); // data 1414 } 1415 1416 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1417 int keyLength, ValueBytes val) throws IOException { 1418 if (keyLength < 0) 1419 throw new IOException("negative length keys not allowed: " + keyLength); 1420 1421 int valLength = val.getSize(); 1422 1423 checkAndWriteSync(); 1424 1425 out.writeInt(keyLength+valLength); // total record length 1426 out.writeInt(keyLength); // key portion length 1427 out.write(keyData, keyOffset, keyLength); // key 1428 val.writeUncompressedBytes(out); // value 1429 } 1430 1431 /** Returns the current length of the output file. 1432 * 1433 * <p>This always returns a synchronized position. In other words, 1434 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1435 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1436 * the key may be earlier in the file than key last written when this 1437 * method was called (e.g., with block-compression, it may be the first key 1438 * in the block that was being written when this method was called). 1439 */ 1440 public synchronized long getLength() throws IOException { 1441 return out.getPos(); 1442 } 1443 1444 } // class Writer 1445 1446 /** Write key/compressed-value pairs to a sequence-format file. */ 1447 static class RecordCompressWriter extends Writer { 1448 1449 RecordCompressWriter(Configuration conf, 1450 Option... options) throws IOException { 1451 super(conf, options); 1452 } 1453 1454 /** Append a key/value pair. */ 1455 @Override 1456 @SuppressWarnings("unchecked") 1457 public synchronized void append(Object key, Object val) 1458 throws IOException { 1459 if (key.getClass() != keyClass) 1460 throw new IOException("wrong key class: "+key.getClass().getName() 1461 +" is not "+keyClass); 1462 if (val.getClass() != valClass) 1463 throw new IOException("wrong value class: "+val.getClass().getName() 1464 +" is not "+valClass); 1465 1466 buffer.reset(); 1467 1468 // Append the 'key' 1469 keySerializer.serialize(key); 1470 int keyLength = buffer.getLength(); 1471 if (keyLength < 0) 1472 throw new IOException("negative length keys not allowed: " + key); 1473 1474 // Compress 'value' and append it 1475 deflateFilter.resetState(); 1476 compressedValSerializer.serialize(val); 1477 deflateOut.flush(); 1478 deflateFilter.finish(); 1479 1480 // Write the record out 1481 checkAndWriteSync(); // sync 1482 out.writeInt(buffer.getLength()); // total record length 1483 out.writeInt(keyLength); // key portion length 1484 out.write(buffer.getData(), 0, buffer.getLength()); // data 1485 } 1486 1487 /** Append a key/value pair. */ 1488 @Override 1489 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1490 int keyLength, ValueBytes val) throws IOException { 1491 1492 if (keyLength < 0) 1493 throw new IOException("negative length keys not allowed: " + keyLength); 1494 1495 int valLength = val.getSize(); 1496 1497 checkAndWriteSync(); // sync 1498 out.writeInt(keyLength+valLength); // total record length 1499 out.writeInt(keyLength); // key portion length 1500 out.write(keyData, keyOffset, keyLength); // 'key' data 1501 val.writeCompressedBytes(out); // 'value' data 1502 } 1503 1504 } // RecordCompressionWriter 1505 1506 /** Write compressed key/value blocks to a sequence-format file. */ 1507 static class BlockCompressWriter extends Writer { 1508 1509 private int noBufferedRecords = 0; 1510 1511 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1512 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1513 1514 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1515 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1516 1517 private final int compressionBlockSize; 1518 1519 BlockCompressWriter(Configuration conf, 1520 Option... options) throws IOException { 1521 super(conf, options); 1522 compressionBlockSize = 1523 conf.getInt(IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY, 1524 IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT 1525 ); 1526 keySerializer.close(); 1527 keySerializer.open(keyBuffer); 1528 uncompressedValSerializer.close(); 1529 uncompressedValSerializer.open(valBuffer); 1530 } 1531 1532 /** Workhorse to check and write out compressed data/lengths */ 1533 private synchronized 1534 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1535 throws IOException { 1536 deflateFilter.resetState(); 1537 buffer.reset(); 1538 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1539 uncompressedDataBuffer.getLength()); 1540 deflateOut.flush(); 1541 deflateFilter.finish(); 1542 1543 WritableUtils.writeVInt(out, buffer.getLength()); 1544 out.write(buffer.getData(), 0, buffer.getLength()); 1545 } 1546 1547 /** Compress and flush contents to dfs */ 1548 @Override 1549 public synchronized void sync() throws IOException { 1550 if (noBufferedRecords > 0) { 1551 super.sync(); 1552 1553 // No. of records 1554 WritableUtils.writeVInt(out, noBufferedRecords); 1555 1556 // Write 'keys' and lengths 1557 writeBuffer(keyLenBuffer); 1558 writeBuffer(keyBuffer); 1559 1560 // Write 'values' and lengths 1561 writeBuffer(valLenBuffer); 1562 writeBuffer(valBuffer); 1563 1564 // Flush the file-stream 1565 out.flush(); 1566 1567 // Reset internal states 1568 keyLenBuffer.reset(); 1569 keyBuffer.reset(); 1570 valLenBuffer.reset(); 1571 valBuffer.reset(); 1572 noBufferedRecords = 0; 1573 } 1574 1575 } 1576 1577 /** Close the file. */ 1578 @Override 1579 public synchronized void close() throws IOException { 1580 if (out != null) { 1581 sync(); 1582 } 1583 super.close(); 1584 } 1585 1586 /** Append a key/value pair. */ 1587 @Override 1588 @SuppressWarnings("unchecked") 1589 public synchronized void append(Object key, Object val) 1590 throws IOException { 1591 if (key.getClass() != keyClass) 1592 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1593 if (val.getClass() != valClass) 1594 throw new IOException("wrong value class: "+val+" is not "+valClass); 1595 1596 // Save key/value into respective buffers 1597 int oldKeyLength = keyBuffer.getLength(); 1598 keySerializer.serialize(key); 1599 int keyLength = keyBuffer.getLength() - oldKeyLength; 1600 if (keyLength < 0) 1601 throw new IOException("negative length keys not allowed: " + key); 1602 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1603 1604 int oldValLength = valBuffer.getLength(); 1605 uncompressedValSerializer.serialize(val); 1606 int valLength = valBuffer.getLength() - oldValLength; 1607 WritableUtils.writeVInt(valLenBuffer, valLength); 1608 1609 // Added another key/value pair 1610 ++noBufferedRecords; 1611 1612 // Compress and flush? 1613 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1614 if (currentBlockSize >= compressionBlockSize) { 1615 sync(); 1616 } 1617 } 1618 1619 /** Append a key/value pair. */ 1620 @Override 1621 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1622 int keyLength, ValueBytes val) throws IOException { 1623 1624 if (keyLength < 0) 1625 throw new IOException("negative length keys not allowed"); 1626 1627 int valLength = val.getSize(); 1628 1629 // Save key/value data in relevant buffers 1630 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1631 keyBuffer.write(keyData, keyOffset, keyLength); 1632 WritableUtils.writeVInt(valLenBuffer, valLength); 1633 val.writeUncompressedBytes(valBuffer); 1634 1635 // Added another key/value pair 1636 ++noBufferedRecords; 1637 1638 // Compress and flush? 1639 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1640 if (currentBlockSize >= compressionBlockSize) { 1641 sync(); 1642 } 1643 } 1644 1645 } // BlockCompressionWriter 1646 1647 /** Get the configured buffer size */ 1648 private static int getBufferSize(Configuration conf) { 1649 return conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT); 1650 } 1651 1652 /** Reads key/value pairs from a sequence-format file. */ 1653 public static class Reader implements java.io.Closeable { 1654 private String filename; 1655 private FSDataInputStream in; 1656 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1657 1658 private byte version; 1659 1660 private String keyClassName; 1661 private String valClassName; 1662 private Class keyClass; 1663 private Class valClass; 1664 1665 private CompressionCodec codec = null; 1666 private Metadata metadata = null; 1667 1668 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1669 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1670 private boolean syncSeen; 1671 1672 private long headerEnd; 1673 private long end; 1674 private int keyLength; 1675 private int recordLength; 1676 1677 private boolean decompress; 1678 private boolean blockCompressed; 1679 1680 private Configuration conf; 1681 1682 private int noBufferedRecords = 0; 1683 private boolean lazyDecompress = true; 1684 private boolean valuesDecompressed = true; 1685 1686 private int noBufferedKeys = 0; 1687 private int noBufferedValues = 0; 1688 1689 private DataInputBuffer keyLenBuffer = null; 1690 private CompressionInputStream keyLenInFilter = null; 1691 private DataInputStream keyLenIn = null; 1692 private Decompressor keyLenDecompressor = null; 1693 private DataInputBuffer keyBuffer = null; 1694 private CompressionInputStream keyInFilter = null; 1695 private DataInputStream keyIn = null; 1696 private Decompressor keyDecompressor = null; 1697 1698 private DataInputBuffer valLenBuffer = null; 1699 private CompressionInputStream valLenInFilter = null; 1700 private DataInputStream valLenIn = null; 1701 private Decompressor valLenDecompressor = null; 1702 private DataInputBuffer valBuffer = null; 1703 private CompressionInputStream valInFilter = null; 1704 private DataInputStream valIn = null; 1705 private Decompressor valDecompressor = null; 1706 1707 private Deserializer keyDeserializer; 1708 private Deserializer valDeserializer; 1709 1710 /** 1711 * A tag interface for all of the Reader options 1712 */ 1713 public static interface Option {} 1714 1715 /** 1716 * Create an option to specify the path name of the sequence file. 1717 * @param value the path to read 1718 * @return a new option 1719 */ 1720 public static Option file(Path value) { 1721 return new FileOption(value); 1722 } 1723 1724 /** 1725 * Create an option to specify the stream with the sequence file. 1726 * @param value the stream to read. 1727 * @return a new option 1728 */ 1729 public static Option stream(FSDataInputStream value) { 1730 return new InputStreamOption(value); 1731 } 1732 1733 /** 1734 * Create an option to specify the starting byte to read. 1735 * @param value the number of bytes to skip over 1736 * @return a new option 1737 */ 1738 public static Option start(long value) { 1739 return new StartOption(value); 1740 } 1741 1742 /** 1743 * Create an option to specify the number of bytes to read. 1744 * @param value the number of bytes to read 1745 * @return a new option 1746 */ 1747 public static Option length(long value) { 1748 return new LengthOption(value); 1749 } 1750 1751 /** 1752 * Create an option with the buffer size for reading the given pathname. 1753 * @param value the number of bytes to buffer 1754 * @return a new option 1755 */ 1756 public static Option bufferSize(int value) { 1757 return new BufferSizeOption(value); 1758 } 1759 1760 private static class FileOption extends Options.PathOption 1761 implements Option { 1762 private FileOption(Path value) { 1763 super(value); 1764 } 1765 } 1766 1767 private static class InputStreamOption 1768 extends Options.FSDataInputStreamOption 1769 implements Option { 1770 private InputStreamOption(FSDataInputStream value) { 1771 super(value); 1772 } 1773 } 1774 1775 private static class StartOption extends Options.LongOption 1776 implements Option { 1777 private StartOption(long value) { 1778 super(value); 1779 } 1780 } 1781 1782 private static class LengthOption extends Options.LongOption 1783 implements Option { 1784 private LengthOption(long value) { 1785 super(value); 1786 } 1787 } 1788 1789 private static class BufferSizeOption extends Options.IntegerOption 1790 implements Option { 1791 private BufferSizeOption(int value) { 1792 super(value); 1793 } 1794 } 1795 1796 // only used directly 1797 private static class OnlyHeaderOption extends Options.BooleanOption 1798 implements Option { 1799 private OnlyHeaderOption() { 1800 super(true); 1801 } 1802 } 1803 1804 public Reader(Configuration conf, Option... opts) throws IOException { 1805 // Look up the options, these are null if not set 1806 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1807 InputStreamOption streamOpt = 1808 Options.getOption(InputStreamOption.class, opts); 1809 StartOption startOpt = Options.getOption(StartOption.class, opts); 1810 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1811 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1812 OnlyHeaderOption headerOnly = 1813 Options.getOption(OnlyHeaderOption.class, opts); 1814 // check for consistency 1815 if ((fileOpt == null) == (streamOpt == null)) { 1816 throw new 1817 IllegalArgumentException("File or stream option must be specified"); 1818 } 1819 if (fileOpt == null && bufOpt != null) { 1820 throw new IllegalArgumentException("buffer size can only be set when" + 1821 " a file is specified."); 1822 } 1823 // figure out the real values 1824 Path filename = null; 1825 FSDataInputStream file; 1826 final long len; 1827 if (fileOpt != null) { 1828 filename = fileOpt.getValue(); 1829 FileSystem fs = filename.getFileSystem(conf); 1830 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1831 len = null == lenOpt 1832 ? fs.getFileStatus(filename).getLen() 1833 : lenOpt.getValue(); 1834 file = openFile(fs, filename, bufSize, len); 1835 } else { 1836 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1837 file = streamOpt.getValue(); 1838 } 1839 long start = startOpt == null ? 0 : startOpt.getValue(); 1840 // really set up 1841 initialize(filename, file, start, len, conf, headerOnly != null); 1842 } 1843 1844 /** 1845 * Construct a reader by opening a file from the given file system. 1846 * @param fs The file system used to open the file. 1847 * @param file The file being read. 1848 * @param conf Configuration 1849 * @throws IOException 1850 * @deprecated Use Reader(Configuration, Option...) instead. 1851 */ 1852 @Deprecated 1853 public Reader(FileSystem fs, Path file, 1854 Configuration conf) throws IOException { 1855 this(conf, file(file.makeQualified(fs))); 1856 } 1857 1858 /** 1859 * Construct a reader by the given input stream. 1860 * @param in An input stream. 1861 * @param buffersize unused 1862 * @param start The starting position. 1863 * @param length The length being read. 1864 * @param conf Configuration 1865 * @throws IOException 1866 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1867 */ 1868 @Deprecated 1869 public Reader(FSDataInputStream in, int buffersize, 1870 long start, long length, Configuration conf) throws IOException { 1871 this(conf, stream(in), start(start), length(length)); 1872 } 1873 1874 /** Common work of the constructors. */ 1875 private void initialize(Path filename, FSDataInputStream in, 1876 long start, long length, Configuration conf, 1877 boolean tempReader) throws IOException { 1878 if (in == null) { 1879 throw new IllegalArgumentException("in == null"); 1880 } 1881 this.filename = filename == null ? "<unknown>" : filename.toString(); 1882 this.in = in; 1883 this.conf = conf; 1884 boolean succeeded = false; 1885 try { 1886 seek(start); 1887 this.end = this.in.getPos() + length; 1888 // if it wrapped around, use the max 1889 if (end < length) { 1890 end = Long.MAX_VALUE; 1891 } 1892 init(tempReader); 1893 succeeded = true; 1894 } finally { 1895 if (!succeeded) { 1896 IOUtils.cleanup(LOG, this.in); 1897 } 1898 } 1899 } 1900 1901 /** 1902 * Override this method to specialize the type of 1903 * {@link FSDataInputStream} returned. 1904 * @param fs The file system used to open the file. 1905 * @param file The file being read. 1906 * @param bufferSize The buffer size used to read the file. 1907 * @param length The length being read if it is >= 0. Otherwise, 1908 * the length is not available. 1909 * @return The opened stream. 1910 * @throws IOException 1911 */ 1912 protected FSDataInputStream openFile(FileSystem fs, Path file, 1913 int bufferSize, long length) throws IOException { 1914 return fs.open(file, bufferSize); 1915 } 1916 1917 /** 1918 * Initialize the {@link Reader} 1919 * @param tmpReader <code>true</code> if we are constructing a temporary 1920 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1921 * and hence do not initialize every component; 1922 * <code>false</code> otherwise. 1923 * @throws IOException 1924 */ 1925 private void init(boolean tempReader) throws IOException { 1926 byte[] versionBlock = new byte[VERSION.length]; 1927 String exceptionMsg = this + " not a SequenceFile"; 1928 1929 // Try to read sequence file header. 1930 try { 1931 in.readFully(versionBlock); 1932 } catch (EOFException e) { 1933 throw new EOFException(exceptionMsg); 1934 } 1935 1936 if ((versionBlock[0] != VERSION[0]) || 1937 (versionBlock[1] != VERSION[1]) || 1938 (versionBlock[2] != VERSION[2])) { 1939 throw new IOException(this + " not a SequenceFile"); 1940 } 1941 1942 // Set 'version' 1943 version = versionBlock[3]; 1944 if (version > VERSION[3]) { 1945 throw new VersionMismatchException(VERSION[3], version); 1946 } 1947 1948 if (version < BLOCK_COMPRESS_VERSION) { 1949 UTF8 className = new UTF8(); 1950 1951 className.readFields(in); 1952 keyClassName = className.toStringChecked(); // key class name 1953 1954 className.readFields(in); 1955 valClassName = className.toStringChecked(); // val class name 1956 } else { 1957 keyClassName = Text.readString(in); 1958 valClassName = Text.readString(in); 1959 } 1960 1961 if (version > 2) { // if version > 2 1962 this.decompress = in.readBoolean(); // is compressed? 1963 } else { 1964 decompress = false; 1965 } 1966 1967 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1968 this.blockCompressed = in.readBoolean(); // is block-compressed? 1969 } else { 1970 blockCompressed = false; 1971 } 1972 1973 // if version >= 5 1974 // setup the compression codec 1975 if (decompress) { 1976 if (version >= CUSTOM_COMPRESS_VERSION) { 1977 String codecClassname = Text.readString(in); 1978 try { 1979 Class<? extends CompressionCodec> codecClass 1980 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1981 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1982 } catch (ClassNotFoundException cnfe) { 1983 throw new IllegalArgumentException("Unknown codec: " + 1984 codecClassname, cnfe); 1985 } 1986 } else { 1987 codec = new DefaultCodec(); 1988 ((Configurable)codec).setConf(conf); 1989 } 1990 } 1991 1992 this.metadata = new Metadata(); 1993 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1994 this.metadata.readFields(in); 1995 } 1996 1997 if (version > 1) { // if version > 1 1998 in.readFully(sync); // read sync bytes 1999 headerEnd = in.getPos(); // record end of header 2000 } 2001 2002 // Initialize... *not* if this we are constructing a temporary Reader 2003 if (!tempReader) { 2004 valBuffer = new DataInputBuffer(); 2005 if (decompress) { 2006 valDecompressor = CodecPool.getDecompressor(codec); 2007 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 2008 valIn = new DataInputStream(valInFilter); 2009 } else { 2010 valIn = valBuffer; 2011 } 2012 2013 if (blockCompressed) { 2014 keyLenBuffer = new DataInputBuffer(); 2015 keyBuffer = new DataInputBuffer(); 2016 valLenBuffer = new DataInputBuffer(); 2017 2018 keyLenDecompressor = CodecPool.getDecompressor(codec); 2019 keyLenInFilter = codec.createInputStream(keyLenBuffer, 2020 keyLenDecompressor); 2021 keyLenIn = new DataInputStream(keyLenInFilter); 2022 2023 keyDecompressor = CodecPool.getDecompressor(codec); 2024 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 2025 keyIn = new DataInputStream(keyInFilter); 2026 2027 valLenDecompressor = CodecPool.getDecompressor(codec); 2028 valLenInFilter = codec.createInputStream(valLenBuffer, 2029 valLenDecompressor); 2030 valLenIn = new DataInputStream(valLenInFilter); 2031 } 2032 2033 SerializationFactory serializationFactory = 2034 new SerializationFactory(conf); 2035 this.keyDeserializer = 2036 getDeserializer(serializationFactory, getKeyClass()); 2037 if (this.keyDeserializer == null) { 2038 throw new IOException( 2039 "Could not find a deserializer for the Key class: '" 2040 + getKeyClass().getCanonicalName() + "'. " 2041 + "Please ensure that the configuration '" + 2042 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2043 + "properly configured, if you're using " 2044 + "custom serialization."); 2045 } 2046 if (!blockCompressed) { 2047 this.keyDeserializer.open(valBuffer); 2048 } else { 2049 this.keyDeserializer.open(keyIn); 2050 } 2051 this.valDeserializer = 2052 getDeserializer(serializationFactory, getValueClass()); 2053 if (this.valDeserializer == null) { 2054 throw new IOException( 2055 "Could not find a deserializer for the Value class: '" 2056 + getValueClass().getCanonicalName() + "'. " 2057 + "Please ensure that the configuration '" + 2058 CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is " 2059 + "properly configured, if you're using " 2060 + "custom serialization."); 2061 } 2062 this.valDeserializer.open(valIn); 2063 } 2064 } 2065 2066 @SuppressWarnings("unchecked") 2067 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 2068 return sf.getDeserializer(c); 2069 } 2070 2071 /** Close the file. */ 2072 @Override 2073 public synchronized void close() throws IOException { 2074 // Return the decompressors to the pool 2075 CodecPool.returnDecompressor(keyLenDecompressor); 2076 CodecPool.returnDecompressor(keyDecompressor); 2077 CodecPool.returnDecompressor(valLenDecompressor); 2078 CodecPool.returnDecompressor(valDecompressor); 2079 keyLenDecompressor = keyDecompressor = null; 2080 valLenDecompressor = valDecompressor = null; 2081 2082 if (keyDeserializer != null) { 2083 keyDeserializer.close(); 2084 } 2085 if (valDeserializer != null) { 2086 valDeserializer.close(); 2087 } 2088 2089 // Close the input-stream 2090 in.close(); 2091 } 2092 2093 /** Returns the name of the key class. */ 2094 public String getKeyClassName() { 2095 return keyClassName; 2096 } 2097 2098 /** Returns the class of keys in this file. */ 2099 public synchronized Class<?> getKeyClass() { 2100 if (null == keyClass) { 2101 try { 2102 keyClass = WritableName.getClass(getKeyClassName(), conf); 2103 } catch (IOException e) { 2104 throw new RuntimeException(e); 2105 } 2106 } 2107 return keyClass; 2108 } 2109 2110 /** Returns the name of the value class. */ 2111 public String getValueClassName() { 2112 return valClassName; 2113 } 2114 2115 /** Returns the class of values in this file. */ 2116 public synchronized Class<?> getValueClass() { 2117 if (null == valClass) { 2118 try { 2119 valClass = WritableName.getClass(getValueClassName(), conf); 2120 } catch (IOException e) { 2121 throw new RuntimeException(e); 2122 } 2123 } 2124 return valClass; 2125 } 2126 2127 /** Returns true if values are compressed. */ 2128 public boolean isCompressed() { return decompress; } 2129 2130 /** Returns true if records are block-compressed. */ 2131 public boolean isBlockCompressed() { return blockCompressed; } 2132 2133 /** Returns the compression codec of data in this file. */ 2134 public CompressionCodec getCompressionCodec() { return codec; } 2135 2136 private byte[] getSync() { 2137 return sync; 2138 } 2139 2140 private byte getVersion() { 2141 return version; 2142 } 2143 2144 /** 2145 * Get the compression type for this file. 2146 * @return the compression type 2147 */ 2148 public CompressionType getCompressionType() { 2149 if (decompress) { 2150 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 2151 } else { 2152 return CompressionType.NONE; 2153 } 2154 } 2155 2156 /** Returns the metadata object of the file */ 2157 public Metadata getMetadata() { 2158 return this.metadata; 2159 } 2160 2161 /** Returns the configuration used for this file. */ 2162 Configuration getConf() { return conf; } 2163 2164 /** Read a compressed buffer */ 2165 private synchronized void readBuffer(DataInputBuffer buffer, 2166 CompressionInputStream filter) throws IOException { 2167 // Read data into a temporary buffer 2168 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 2169 2170 try { 2171 int dataBufferLength = WritableUtils.readVInt(in); 2172 dataBuffer.write(in, dataBufferLength); 2173 2174 // Set up 'buffer' connected to the input-stream 2175 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 2176 } finally { 2177 dataBuffer.close(); 2178 } 2179 2180 // Reset the codec 2181 filter.resetState(); 2182 } 2183 2184 /** Read the next 'compressed' block */ 2185 private synchronized void readBlock() throws IOException { 2186 // Check if we need to throw away a whole block of 2187 // 'values' due to 'lazy decompression' 2188 if (lazyDecompress && !valuesDecompressed) { 2189 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2190 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2191 } 2192 2193 // Reset internal states 2194 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2195 valuesDecompressed = false; 2196 2197 //Process sync 2198 if (sync != null) { 2199 in.readInt(); 2200 in.readFully(syncCheck); // read syncCheck 2201 if (!Arrays.equals(sync, syncCheck)) // check it 2202 throw new IOException("File is corrupt!"); 2203 } 2204 syncSeen = true; 2205 2206 // Read number of records in this block 2207 noBufferedRecords = WritableUtils.readVInt(in); 2208 2209 // Read key lengths and keys 2210 readBuffer(keyLenBuffer, keyLenInFilter); 2211 readBuffer(keyBuffer, keyInFilter); 2212 noBufferedKeys = noBufferedRecords; 2213 2214 // Read value lengths and values 2215 if (!lazyDecompress) { 2216 readBuffer(valLenBuffer, valLenInFilter); 2217 readBuffer(valBuffer, valInFilter); 2218 noBufferedValues = noBufferedRecords; 2219 valuesDecompressed = true; 2220 } 2221 } 2222 2223 /** 2224 * Position valLenIn/valIn to the 'value' 2225 * corresponding to the 'current' key 2226 */ 2227 private synchronized void seekToCurrentValue() throws IOException { 2228 if (!blockCompressed) { 2229 if (decompress) { 2230 valInFilter.resetState(); 2231 } 2232 valBuffer.reset(); 2233 } else { 2234 // Check if this is the first value in the 'block' to be read 2235 if (lazyDecompress && !valuesDecompressed) { 2236 // Read the value lengths and values 2237 readBuffer(valLenBuffer, valLenInFilter); 2238 readBuffer(valBuffer, valInFilter); 2239 noBufferedValues = noBufferedRecords; 2240 valuesDecompressed = true; 2241 } 2242 2243 // Calculate the no. of bytes to skip 2244 // Note: 'current' key has already been read! 2245 int skipValBytes = 0; 2246 int currentKey = noBufferedKeys + 1; 2247 for (int i=noBufferedValues; i > currentKey; --i) { 2248 skipValBytes += WritableUtils.readVInt(valLenIn); 2249 --noBufferedValues; 2250 } 2251 2252 // Skip to the 'val' corresponding to 'current' key 2253 if (skipValBytes > 0) { 2254 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2255 throw new IOException("Failed to seek to " + currentKey + 2256 "(th) value!"); 2257 } 2258 } 2259 } 2260 } 2261 2262 /** 2263 * Get the 'value' corresponding to the last read 'key'. 2264 * @param val : The 'value' to be read. 2265 * @throws IOException 2266 */ 2267 public synchronized void getCurrentValue(Writable val) 2268 throws IOException { 2269 if (val instanceof Configurable) { 2270 ((Configurable) val).setConf(this.conf); 2271 } 2272 2273 // Position stream to 'current' value 2274 seekToCurrentValue(); 2275 2276 if (!blockCompressed) { 2277 val.readFields(valIn); 2278 2279 if (valIn.read() > 0) { 2280 LOG.info("available bytes: " + valIn.available()); 2281 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2282 + " bytes, should read " + 2283 (valBuffer.getLength()-keyLength)); 2284 } 2285 } else { 2286 // Get the value 2287 int valLength = WritableUtils.readVInt(valLenIn); 2288 val.readFields(valIn); 2289 2290 // Read another compressed 'value' 2291 --noBufferedValues; 2292 2293 // Sanity check 2294 if ((valLength < 0) && LOG.isDebugEnabled()) { 2295 LOG.debug(val + " is a zero-length value"); 2296 } 2297 } 2298 2299 } 2300 2301 /** 2302 * Get the 'value' corresponding to the last read 'key'. 2303 * @param val : The 'value' to be read. 2304 * @throws IOException 2305 */ 2306 public synchronized Object getCurrentValue(Object val) 2307 throws IOException { 2308 if (val instanceof Configurable) { 2309 ((Configurable) val).setConf(this.conf); 2310 } 2311 2312 // Position stream to 'current' value 2313 seekToCurrentValue(); 2314 2315 if (!blockCompressed) { 2316 val = deserializeValue(val); 2317 2318 if (valIn.read() > 0) { 2319 LOG.info("available bytes: " + valIn.available()); 2320 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2321 + " bytes, should read " + 2322 (valBuffer.getLength()-keyLength)); 2323 } 2324 } else { 2325 // Get the value 2326 int valLength = WritableUtils.readVInt(valLenIn); 2327 val = deserializeValue(val); 2328 2329 // Read another compressed 'value' 2330 --noBufferedValues; 2331 2332 // Sanity check 2333 if ((valLength < 0) && LOG.isDebugEnabled()) { 2334 LOG.debug(val + " is a zero-length value"); 2335 } 2336 } 2337 return val; 2338 2339 } 2340 2341 @SuppressWarnings("unchecked") 2342 private Object deserializeValue(Object val) throws IOException { 2343 return valDeserializer.deserialize(val); 2344 } 2345 2346 /** Read the next key in the file into <code>key</code>, skipping its 2347 * value. True if another entry exists, and false at end of file. */ 2348 public synchronized boolean next(Writable key) throws IOException { 2349 if (key.getClass() != getKeyClass()) 2350 throw new IOException("wrong key class: "+key.getClass().getName() 2351 +" is not "+keyClass); 2352 2353 if (!blockCompressed) { 2354 outBuf.reset(); 2355 2356 keyLength = next(outBuf); 2357 if (keyLength < 0) 2358 return false; 2359 2360 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2361 2362 key.readFields(valBuffer); 2363 valBuffer.mark(0); 2364 if (valBuffer.getPosition() != keyLength) 2365 throw new IOException(key + " read " + valBuffer.getPosition() 2366 + " bytes, should read " + keyLength); 2367 } else { 2368 //Reset syncSeen 2369 syncSeen = false; 2370 2371 if (noBufferedKeys == 0) { 2372 try { 2373 readBlock(); 2374 } catch (EOFException eof) { 2375 return false; 2376 } 2377 } 2378 2379 int keyLength = WritableUtils.readVInt(keyLenIn); 2380 2381 // Sanity check 2382 if (keyLength < 0) { 2383 return false; 2384 } 2385 2386 //Read another compressed 'key' 2387 key.readFields(keyIn); 2388 --noBufferedKeys; 2389 } 2390 2391 return true; 2392 } 2393 2394 /** Read the next key/value pair in the file into <code>key</code> and 2395 * <code>val</code>. Returns true if such a pair exists and false when at 2396 * end of file */ 2397 public synchronized boolean next(Writable key, Writable val) 2398 throws IOException { 2399 if (val.getClass() != getValueClass()) 2400 throw new IOException("wrong value class: "+val+" is not "+valClass); 2401 2402 boolean more = next(key); 2403 2404 if (more) { 2405 getCurrentValue(val); 2406 } 2407 2408 return more; 2409 } 2410 2411 /** 2412 * Read and return the next record length, potentially skipping over 2413 * a sync block. 2414 * @return the length of the next record or -1 if there is no next record 2415 * @throws IOException 2416 */ 2417 private synchronized int readRecordLength() throws IOException { 2418 if (in.getPos() >= end) { 2419 return -1; 2420 } 2421 int length = in.readInt(); 2422 if (version > 1 && sync != null && 2423 length == SYNC_ESCAPE) { // process a sync entry 2424 in.readFully(syncCheck); // read syncCheck 2425 if (!Arrays.equals(sync, syncCheck)) // check it 2426 throw new IOException("File is corrupt!"); 2427 syncSeen = true; 2428 if (in.getPos() >= end) { 2429 return -1; 2430 } 2431 length = in.readInt(); // re-read length 2432 } else { 2433 syncSeen = false; 2434 } 2435 2436 return length; 2437 } 2438 2439 /** Read the next key/value pair in the file into <code>buffer</code>. 2440 * Returns the length of the key read, or -1 if at end of file. The length 2441 * of the value may be computed by calling buffer.getLength() before and 2442 * after calls to this method. */ 2443 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2444 @Deprecated 2445 synchronized int next(DataOutputBuffer buffer) throws IOException { 2446 // Unsupported for block-compressed sequence files 2447 if (blockCompressed) { 2448 throw new IOException("Unsupported call for block-compressed" + 2449 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2450 } 2451 try { 2452 int length = readRecordLength(); 2453 if (length == -1) { 2454 return -1; 2455 } 2456 int keyLength = in.readInt(); 2457 buffer.write(in, length); 2458 return keyLength; 2459 } catch (ChecksumException e) { // checksum failure 2460 handleChecksumException(e); 2461 return next(buffer); 2462 } 2463 } 2464 2465 public ValueBytes createValueBytes() { 2466 ValueBytes val = null; 2467 if (!decompress || blockCompressed) { 2468 val = new UncompressedBytes(); 2469 } else { 2470 val = new CompressedBytes(codec); 2471 } 2472 return val; 2473 } 2474 2475 /** 2476 * Read 'raw' records. 2477 * @param key - The buffer into which the key is read 2478 * @param val - The 'raw' value 2479 * @return Returns the total record length or -1 for end of file 2480 * @throws IOException 2481 */ 2482 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2483 throws IOException { 2484 if (!blockCompressed) { 2485 int length = readRecordLength(); 2486 if (length == -1) { 2487 return -1; 2488 } 2489 int keyLength = in.readInt(); 2490 int valLength = length - keyLength; 2491 key.write(in, keyLength); 2492 if (decompress) { 2493 CompressedBytes value = (CompressedBytes)val; 2494 value.reset(in, valLength); 2495 } else { 2496 UncompressedBytes value = (UncompressedBytes)val; 2497 value.reset(in, valLength); 2498 } 2499 2500 return length; 2501 } else { 2502 //Reset syncSeen 2503 syncSeen = false; 2504 2505 // Read 'key' 2506 if (noBufferedKeys == 0) { 2507 if (in.getPos() >= end) 2508 return -1; 2509 2510 try { 2511 readBlock(); 2512 } catch (EOFException eof) { 2513 return -1; 2514 } 2515 } 2516 int keyLength = WritableUtils.readVInt(keyLenIn); 2517 if (keyLength < 0) { 2518 throw new IOException("zero length key found!"); 2519 } 2520 key.write(keyIn, keyLength); 2521 --noBufferedKeys; 2522 2523 // Read raw 'value' 2524 seekToCurrentValue(); 2525 int valLength = WritableUtils.readVInt(valLenIn); 2526 UncompressedBytes rawValue = (UncompressedBytes)val; 2527 rawValue.reset(valIn, valLength); 2528 --noBufferedValues; 2529 2530 return (keyLength+valLength); 2531 } 2532 2533 } 2534 2535 /** 2536 * Read 'raw' keys. 2537 * @param key - The buffer into which the key is read 2538 * @return Returns the key length or -1 for end of file 2539 * @throws IOException 2540 */ 2541 public synchronized int nextRawKey(DataOutputBuffer key) 2542 throws IOException { 2543 if (!blockCompressed) { 2544 recordLength = readRecordLength(); 2545 if (recordLength == -1) { 2546 return -1; 2547 } 2548 keyLength = in.readInt(); 2549 key.write(in, keyLength); 2550 return keyLength; 2551 } else { 2552 //Reset syncSeen 2553 syncSeen = false; 2554 2555 // Read 'key' 2556 if (noBufferedKeys == 0) { 2557 if (in.getPos() >= end) 2558 return -1; 2559 2560 try { 2561 readBlock(); 2562 } catch (EOFException eof) { 2563 return -1; 2564 } 2565 } 2566 int keyLength = WritableUtils.readVInt(keyLenIn); 2567 if (keyLength < 0) { 2568 throw new IOException("zero length key found!"); 2569 } 2570 key.write(keyIn, keyLength); 2571 --noBufferedKeys; 2572 2573 return keyLength; 2574 } 2575 2576 } 2577 2578 /** Read the next key in the file, skipping its 2579 * value. Return null at end of file. */ 2580 public synchronized Object next(Object key) throws IOException { 2581 if (key != null && key.getClass() != getKeyClass()) { 2582 throw new IOException("wrong key class: "+key.getClass().getName() 2583 +" is not "+keyClass); 2584 } 2585 2586 if (!blockCompressed) { 2587 outBuf.reset(); 2588 2589 keyLength = next(outBuf); 2590 if (keyLength < 0) 2591 return null; 2592 2593 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2594 2595 key = deserializeKey(key); 2596 valBuffer.mark(0); 2597 if (valBuffer.getPosition() != keyLength) 2598 throw new IOException(key + " read " + valBuffer.getPosition() 2599 + " bytes, should read " + keyLength); 2600 } else { 2601 //Reset syncSeen 2602 syncSeen = false; 2603 2604 if (noBufferedKeys == 0) { 2605 try { 2606 readBlock(); 2607 } catch (EOFException eof) { 2608 return null; 2609 } 2610 } 2611 2612 int keyLength = WritableUtils.readVInt(keyLenIn); 2613 2614 // Sanity check 2615 if (keyLength < 0) { 2616 return null; 2617 } 2618 2619 //Read another compressed 'key' 2620 key = deserializeKey(key); 2621 --noBufferedKeys; 2622 } 2623 2624 return key; 2625 } 2626 2627 @SuppressWarnings("unchecked") 2628 private Object deserializeKey(Object key) throws IOException { 2629 return keyDeserializer.deserialize(key); 2630 } 2631 2632 /** 2633 * Read 'raw' values. 2634 * @param val - The 'raw' value 2635 * @return Returns the value length 2636 * @throws IOException 2637 */ 2638 public synchronized int nextRawValue(ValueBytes val) 2639 throws IOException { 2640 2641 // Position stream to current value 2642 seekToCurrentValue(); 2643 2644 if (!blockCompressed) { 2645 int valLength = recordLength - keyLength; 2646 if (decompress) { 2647 CompressedBytes value = (CompressedBytes)val; 2648 value.reset(in, valLength); 2649 } else { 2650 UncompressedBytes value = (UncompressedBytes)val; 2651 value.reset(in, valLength); 2652 } 2653 2654 return valLength; 2655 } else { 2656 int valLength = WritableUtils.readVInt(valLenIn); 2657 UncompressedBytes rawValue = (UncompressedBytes)val; 2658 rawValue.reset(valIn, valLength); 2659 --noBufferedValues; 2660 return valLength; 2661 } 2662 2663 } 2664 2665 private void handleChecksumException(ChecksumException e) 2666 throws IOException { 2667 if (this.conf.getBoolean( 2668 IO_SKIP_CHECKSUM_ERRORS_KEY, IO_SKIP_CHECKSUM_ERRORS_DEFAULT)) { 2669 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2670 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2671 } else { 2672 throw e; 2673 } 2674 } 2675 2676 /** disables sync. often invoked for tmp files */ 2677 synchronized void ignoreSync() { 2678 sync = null; 2679 } 2680 2681 /** Set the current byte position in the input file. 2682 * 2683 * <p>The position passed must be a position returned by {@link 2684 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2685 * position, use {@link SequenceFile.Reader#sync(long)}. 2686 */ 2687 public synchronized void seek(long position) throws IOException { 2688 in.seek(position); 2689 if (blockCompressed) { // trigger block read 2690 noBufferedKeys = 0; 2691 valuesDecompressed = true; 2692 } 2693 } 2694 2695 /** Seek to the next sync mark past a given position.*/ 2696 public synchronized void sync(long position) throws IOException { 2697 if (position+SYNC_SIZE >= end) { 2698 seek(end); 2699 return; 2700 } 2701 2702 if (position < headerEnd) { 2703 // seek directly to first record 2704 in.seek(headerEnd); 2705 // note the sync marker "seen" in the header 2706 syncSeen = true; 2707 return; 2708 } 2709 2710 try { 2711 seek(position+4); // skip escape 2712 in.readFully(syncCheck); 2713 int syncLen = sync.length; 2714 for (int i = 0; in.getPos() < end; i++) { 2715 int j = 0; 2716 for (; j < syncLen; j++) { 2717 if (sync[j] != syncCheck[(i+j)%syncLen]) 2718 break; 2719 } 2720 if (j == syncLen) { 2721 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2722 return; 2723 } 2724 syncCheck[i%syncLen] = in.readByte(); 2725 } 2726 } catch (ChecksumException e) { // checksum failure 2727 handleChecksumException(e); 2728 } 2729 } 2730 2731 /** Returns true iff the previous call to next passed a sync mark.*/ 2732 public synchronized boolean syncSeen() { return syncSeen; } 2733 2734 /** Return the current byte position in the input file. */ 2735 public synchronized long getPosition() throws IOException { 2736 return in.getPos(); 2737 } 2738 2739 /** Returns the name of the file. */ 2740 @Override 2741 public String toString() { 2742 return filename; 2743 } 2744 2745 } 2746 2747 /** Sorts key/value pairs in a sequence-format file. 2748 * 2749 * <p>For best performance, applications should make sure that the {@link 2750 * Writable#readFields(DataInput)} implementation of their keys is 2751 * very efficient. In particular, it should avoid allocating memory. 2752 */ 2753 public static class Sorter { 2754 2755 private RawComparator comparator; 2756 2757 private MergeSort mergeSort; //the implementation of merge sort 2758 2759 private Path[] inFiles; // when merging or sorting 2760 2761 private Path outFile; 2762 2763 private int memory; // bytes 2764 private int factor; // merged per pass 2765 2766 private FileSystem fs = null; 2767 2768 private Class keyClass; 2769 private Class valClass; 2770 2771 private Configuration conf; 2772 private Metadata metadata; 2773 2774 private Progressable progressable = null; 2775 2776 /** Sort and merge files containing the named classes. */ 2777 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2778 Class valClass, Configuration conf) { 2779 this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); 2780 } 2781 2782 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2783 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2784 Class valClass, Configuration conf) { 2785 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2786 } 2787 2788 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2789 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2790 Class valClass, Configuration conf, Metadata metadata) { 2791 this.fs = fs; 2792 this.comparator = comparator; 2793 this.keyClass = keyClass; 2794 this.valClass = valClass; 2795 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2796 this.factor = conf.getInt("io.sort.factor", 100); 2797 this.conf = conf; 2798 this.metadata = metadata; 2799 } 2800 2801 /** Set the number of streams to merge at once.*/ 2802 public void setFactor(int factor) { this.factor = factor; } 2803 2804 /** Get the number of streams to merge at once.*/ 2805 public int getFactor() { return factor; } 2806 2807 /** Set the total amount of buffer memory, in bytes.*/ 2808 public void setMemory(int memory) { this.memory = memory; } 2809 2810 /** Get the total amount of buffer memory, in bytes.*/ 2811 public int getMemory() { return memory; } 2812 2813 /** Set the progressable object in order to report progress. */ 2814 public void setProgressable(Progressable progressable) { 2815 this.progressable = progressable; 2816 } 2817 2818 /** 2819 * Perform a file sort from a set of input files into an output file. 2820 * @param inFiles the files to be sorted 2821 * @param outFile the sorted output file 2822 * @param deleteInput should the input files be deleted as they are read? 2823 */ 2824 public void sort(Path[] inFiles, Path outFile, 2825 boolean deleteInput) throws IOException { 2826 if (fs.exists(outFile)) { 2827 throw new IOException("already exists: " + outFile); 2828 } 2829 2830 this.inFiles = inFiles; 2831 this.outFile = outFile; 2832 2833 int segments = sortPass(deleteInput); 2834 if (segments > 1) { 2835 mergePass(outFile.getParent()); 2836 } 2837 } 2838 2839 /** 2840 * Perform a file sort from a set of input files and return an iterator. 2841 * @param inFiles the files to be sorted 2842 * @param tempDir the directory where temp files are created during sort 2843 * @param deleteInput should the input files be deleted as they are read? 2844 * @return iterator the RawKeyValueIterator 2845 */ 2846 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2847 boolean deleteInput) throws IOException { 2848 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2849 if (fs.exists(outFile)) { 2850 throw new IOException("already exists: " + outFile); 2851 } 2852 this.inFiles = inFiles; 2853 //outFile will basically be used as prefix for temp files in the cases 2854 //where sort outputs multiple sorted segments. For the single segment 2855 //case, the outputFile itself will contain the sorted data for that 2856 //segment 2857 this.outFile = outFile; 2858 2859 int segments = sortPass(deleteInput); 2860 if (segments > 1) 2861 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2862 tempDir); 2863 else if (segments == 1) 2864 return merge(new Path[]{outFile}, true, tempDir); 2865 else return null; 2866 } 2867 2868 /** 2869 * The backwards compatible interface to sort. 2870 * @param inFile the input file to sort 2871 * @param outFile the sorted output file 2872 */ 2873 public void sort(Path inFile, Path outFile) throws IOException { 2874 sort(new Path[]{inFile}, outFile, false); 2875 } 2876 2877 private int sortPass(boolean deleteInput) throws IOException { 2878 if(LOG.isDebugEnabled()) { 2879 LOG.debug("running sort pass"); 2880 } 2881 SortPass sortPass = new SortPass(); // make the SortPass 2882 sortPass.setProgressable(progressable); 2883 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2884 try { 2885 return sortPass.run(deleteInput); // run it 2886 } finally { 2887 sortPass.close(); // close it 2888 } 2889 } 2890 2891 private class SortPass { 2892 private int memoryLimit = memory/4; 2893 private int recordLimit = 1000000; 2894 2895 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2896 private byte[] rawBuffer; 2897 2898 private int[] keyOffsets = new int[1024]; 2899 private int[] pointers = new int[keyOffsets.length]; 2900 private int[] pointersCopy = new int[keyOffsets.length]; 2901 private int[] keyLengths = new int[keyOffsets.length]; 2902 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2903 2904 private ArrayList segmentLengths = new ArrayList(); 2905 2906 private Reader in = null; 2907 private FSDataOutputStream out = null; 2908 private FSDataOutputStream indexOut = null; 2909 private Path outName; 2910 2911 private Progressable progressable = null; 2912 2913 public int run(boolean deleteInput) throws IOException { 2914 int segments = 0; 2915 int currentFile = 0; 2916 boolean atEof = (currentFile >= inFiles.length); 2917 CompressionType compressionType; 2918 CompressionCodec codec = null; 2919 segmentLengths.clear(); 2920 if (atEof) { 2921 return 0; 2922 } 2923 2924 // Initialize 2925 in = new Reader(fs, inFiles[currentFile], conf); 2926 compressionType = in.getCompressionType(); 2927 codec = in.getCompressionCodec(); 2928 2929 for (int i=0; i < rawValues.length; ++i) { 2930 rawValues[i] = null; 2931 } 2932 2933 while (!atEof) { 2934 int count = 0; 2935 int bytesProcessed = 0; 2936 rawKeys.reset(); 2937 while (!atEof && 2938 bytesProcessed < memoryLimit && count < recordLimit) { 2939 2940 // Read a record into buffer 2941 // Note: Attempt to re-use 'rawValue' as far as possible 2942 int keyOffset = rawKeys.getLength(); 2943 ValueBytes rawValue = 2944 (count == keyOffsets.length || rawValues[count] == null) ? 2945 in.createValueBytes() : 2946 rawValues[count]; 2947 int recordLength = in.nextRaw(rawKeys, rawValue); 2948 if (recordLength == -1) { 2949 in.close(); 2950 if (deleteInput) { 2951 fs.delete(inFiles[currentFile], true); 2952 } 2953 currentFile += 1; 2954 atEof = currentFile >= inFiles.length; 2955 if (!atEof) { 2956 in = new Reader(fs, inFiles[currentFile], conf); 2957 } else { 2958 in = null; 2959 } 2960 continue; 2961 } 2962 2963 int keyLength = rawKeys.getLength() - keyOffset; 2964 2965 if (count == keyOffsets.length) 2966 grow(); 2967 2968 keyOffsets[count] = keyOffset; // update pointers 2969 pointers[count] = count; 2970 keyLengths[count] = keyLength; 2971 rawValues[count] = rawValue; 2972 2973 bytesProcessed += recordLength; 2974 count++; 2975 } 2976 2977 // buffer is full -- sort & flush it 2978 if(LOG.isDebugEnabled()) { 2979 LOG.debug("flushing segment " + segments); 2980 } 2981 rawBuffer = rawKeys.getData(); 2982 sort(count); 2983 // indicate we're making progress 2984 if (progressable != null) { 2985 progressable.progress(); 2986 } 2987 flush(count, bytesProcessed, compressionType, codec, 2988 segments==0 && atEof); 2989 segments++; 2990 } 2991 return segments; 2992 } 2993 2994 public void close() throws IOException { 2995 if (in != null) { 2996 in.close(); 2997 } 2998 if (out != null) { 2999 out.close(); 3000 } 3001 if (indexOut != null) { 3002 indexOut.close(); 3003 } 3004 } 3005 3006 private void grow() { 3007 int newLength = keyOffsets.length * 3 / 2; 3008 keyOffsets = grow(keyOffsets, newLength); 3009 pointers = grow(pointers, newLength); 3010 pointersCopy = new int[newLength]; 3011 keyLengths = grow(keyLengths, newLength); 3012 rawValues = grow(rawValues, newLength); 3013 } 3014 3015 private int[] grow(int[] old, int newLength) { 3016 int[] result = new int[newLength]; 3017 System.arraycopy(old, 0, result, 0, old.length); 3018 return result; 3019 } 3020 3021 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 3022 ValueBytes[] result = new ValueBytes[newLength]; 3023 System.arraycopy(old, 0, result, 0, old.length); 3024 for (int i=old.length; i < newLength; ++i) { 3025 result[i] = null; 3026 } 3027 return result; 3028 } 3029 3030 private void flush(int count, int bytesProcessed, 3031 CompressionType compressionType, 3032 CompressionCodec codec, 3033 boolean done) throws IOException { 3034 if (out == null) { 3035 outName = done ? outFile : outFile.suffix(".0"); 3036 out = fs.create(outName); 3037 if (!done) { 3038 indexOut = fs.create(outName.suffix(".index")); 3039 } 3040 } 3041 3042 long segmentStart = out.getPos(); 3043 Writer writer = createWriter(conf, Writer.stream(out), 3044 Writer.keyClass(keyClass), Writer.valueClass(valClass), 3045 Writer.compression(compressionType, codec), 3046 Writer.metadata(done ? metadata : new Metadata())); 3047 3048 if (!done) { 3049 writer.sync = null; // disable sync on temp files 3050 } 3051 3052 for (int i = 0; i < count; i++) { // write in sorted order 3053 int p = pointers[i]; 3054 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 3055 } 3056 writer.close(); 3057 3058 if (!done) { 3059 // Save the segment length 3060 WritableUtils.writeVLong(indexOut, segmentStart); 3061 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 3062 indexOut.flush(); 3063 } 3064 } 3065 3066 private void sort(int count) { 3067 System.arraycopy(pointers, 0, pointersCopy, 0, count); 3068 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 3069 } 3070 class SeqFileComparator implements Comparator<IntWritable> { 3071 @Override 3072 public int compare(IntWritable I, IntWritable J) { 3073 return comparator.compare(rawBuffer, keyOffsets[I.get()], 3074 keyLengths[I.get()], rawBuffer, 3075 keyOffsets[J.get()], keyLengths[J.get()]); 3076 } 3077 } 3078 3079 /** set the progressable object in order to report progress */ 3080 public void setProgressable(Progressable progressable) 3081 { 3082 this.progressable = progressable; 3083 } 3084 3085 } // SequenceFile.Sorter.SortPass 3086 3087 /** The interface to iterate over raw keys/values of SequenceFiles. */ 3088 public static interface RawKeyValueIterator { 3089 /** Gets the current raw key 3090 * @return DataOutputBuffer 3091 * @throws IOException 3092 */ 3093 DataOutputBuffer getKey() throws IOException; 3094 /** Gets the current raw value 3095 * @return ValueBytes 3096 * @throws IOException 3097 */ 3098 ValueBytes getValue() throws IOException; 3099 /** Sets up the current key and value (for getKey and getValue) 3100 * @return true if there exists a key/value, false otherwise 3101 * @throws IOException 3102 */ 3103 boolean next() throws IOException; 3104 /** closes the iterator so that the underlying streams can be closed 3105 * @throws IOException 3106 */ 3107 void close() throws IOException; 3108 /** Gets the Progress object; this has a float (0.0 - 1.0) 3109 * indicating the bytes processed by the iterator so far 3110 */ 3111 Progress getProgress(); 3112 } 3113 3114 /** 3115 * Merges the list of segments of type <code>SegmentDescriptor</code> 3116 * @param segments the list of SegmentDescriptors 3117 * @param tmpDir the directory to write temporary files into 3118 * @return RawKeyValueIterator 3119 * @throws IOException 3120 */ 3121 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 3122 Path tmpDir) 3123 throws IOException { 3124 // pass in object to report progress, if present 3125 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 3126 return mQueue.merge(); 3127 } 3128 3129 /** 3130 * Merges the contents of files passed in Path[] using a max factor value 3131 * that is already set 3132 * @param inNames the array of path names 3133 * @param deleteInputs true if the input files should be deleted when 3134 * unnecessary 3135 * @param tmpDir the directory to write temporary files into 3136 * @return RawKeyValueIteratorMergeQueue 3137 * @throws IOException 3138 */ 3139 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3140 Path tmpDir) 3141 throws IOException { 3142 return merge(inNames, deleteInputs, 3143 (inNames.length < factor) ? inNames.length : factor, 3144 tmpDir); 3145 } 3146 3147 /** 3148 * Merges the contents of files passed in Path[] 3149 * @param inNames the array of path names 3150 * @param deleteInputs true if the input files should be deleted when 3151 * unnecessary 3152 * @param factor the factor that will be used as the maximum merge fan-in 3153 * @param tmpDir the directory to write temporary files into 3154 * @return RawKeyValueIteratorMergeQueue 3155 * @throws IOException 3156 */ 3157 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 3158 int factor, Path tmpDir) 3159 throws IOException { 3160 //get the segments from inNames 3161 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3162 for (int i = 0; i < inNames.length; i++) { 3163 SegmentDescriptor s = new SegmentDescriptor(0, 3164 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3165 s.preserveInput(!deleteInputs); 3166 s.doSync(); 3167 a.add(s); 3168 } 3169 this.factor = factor; 3170 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 3171 return mQueue.merge(); 3172 } 3173 3174 /** 3175 * Merges the contents of files passed in Path[] 3176 * @param inNames the array of path names 3177 * @param tempDir the directory for creating temp files during merge 3178 * @param deleteInputs true if the input files should be deleted when 3179 * unnecessary 3180 * @return RawKeyValueIteratorMergeQueue 3181 * @throws IOException 3182 */ 3183 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 3184 boolean deleteInputs) 3185 throws IOException { 3186 //outFile will basically be used as prefix for temp files for the 3187 //intermediate merge outputs 3188 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3189 //get the segments from inNames 3190 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3191 for (int i = 0; i < inNames.length; i++) { 3192 SegmentDescriptor s = new SegmentDescriptor(0, 3193 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3194 s.preserveInput(!deleteInputs); 3195 s.doSync(); 3196 a.add(s); 3197 } 3198 factor = (inNames.length < factor) ? inNames.length : factor; 3199 // pass in object to report progress, if present 3200 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3201 return mQueue.merge(); 3202 } 3203 3204 /** 3205 * Clones the attributes (like compression of the input file and creates a 3206 * corresponding Writer 3207 * @param inputFile the path of the input file whose attributes should be 3208 * cloned 3209 * @param outputFile the path of the output file 3210 * @param prog the Progressable to report status during the file write 3211 * @return Writer 3212 * @throws IOException 3213 */ 3214 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3215 Progressable prog) throws IOException { 3216 Reader reader = new Reader(conf, 3217 Reader.file(inputFile), 3218 new Reader.OnlyHeaderOption()); 3219 CompressionType compress = reader.getCompressionType(); 3220 CompressionCodec codec = reader.getCompressionCodec(); 3221 reader.close(); 3222 3223 Writer writer = createWriter(conf, 3224 Writer.file(outputFile), 3225 Writer.keyClass(keyClass), 3226 Writer.valueClass(valClass), 3227 Writer.compression(compress, codec), 3228 Writer.progressable(prog)); 3229 return writer; 3230 } 3231 3232 /** 3233 * Writes records from RawKeyValueIterator into a file represented by the 3234 * passed writer 3235 * @param records the RawKeyValueIterator 3236 * @param writer the Writer created earlier 3237 * @throws IOException 3238 */ 3239 public void writeFile(RawKeyValueIterator records, Writer writer) 3240 throws IOException { 3241 while(records.next()) { 3242 writer.appendRaw(records.getKey().getData(), 0, 3243 records.getKey().getLength(), records.getValue()); 3244 } 3245 writer.sync(); 3246 } 3247 3248 /** Merge the provided files. 3249 * @param inFiles the array of input path names 3250 * @param outFile the final output file 3251 * @throws IOException 3252 */ 3253 public void merge(Path[] inFiles, Path outFile) throws IOException { 3254 if (fs.exists(outFile)) { 3255 throw new IOException("already exists: " + outFile); 3256 } 3257 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3258 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3259 3260 writeFile(r, writer); 3261 3262 writer.close(); 3263 } 3264 3265 /** sort calls this to generate the final merged output */ 3266 private int mergePass(Path tmpDir) throws IOException { 3267 if(LOG.isDebugEnabled()) { 3268 LOG.debug("running merge pass"); 3269 } 3270 Writer writer = cloneFileAttributes( 3271 outFile.suffix(".0"), outFile, null); 3272 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3273 outFile.suffix(".0.index"), tmpDir); 3274 writeFile(r, writer); 3275 3276 writer.close(); 3277 return 0; 3278 } 3279 3280 /** Used by mergePass to merge the output of the sort 3281 * @param inName the name of the input file containing sorted segments 3282 * @param indexIn the offsets of the sorted segments 3283 * @param tmpDir the relative directory to store intermediate results in 3284 * @return RawKeyValueIterator 3285 * @throws IOException 3286 */ 3287 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3288 throws IOException { 3289 //get the segments from indexIn 3290 //we create a SegmentContainer so that we can track segments belonging to 3291 //inName and delete inName as soon as we see that we have looked at all 3292 //the contained segments during the merge process & hence don't need 3293 //them anymore 3294 SegmentContainer container = new SegmentContainer(inName, indexIn); 3295 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3296 return mQueue.merge(); 3297 } 3298 3299 /** This class implements the core of the merge logic */ 3300 private class MergeQueue extends PriorityQueue 3301 implements RawKeyValueIterator { 3302 private boolean compress; 3303 private boolean blockCompress; 3304 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3305 private ValueBytes rawValue; 3306 private long totalBytesProcessed; 3307 private float progPerByte; 3308 private Progress mergeProgress = new Progress(); 3309 private Path tmpDir; 3310 private Progressable progress = null; //handle to the progress reporting object 3311 private SegmentDescriptor minSegment; 3312 3313 //a TreeMap used to store the segments sorted by size (segment offset and 3314 //segment path name is used to break ties between segments of same sizes) 3315 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3316 new TreeMap<SegmentDescriptor, Void>(); 3317 3318 @SuppressWarnings("unchecked") 3319 public void put(SegmentDescriptor stream) throws IOException { 3320 if (size() == 0) { 3321 compress = stream.in.isCompressed(); 3322 blockCompress = stream.in.isBlockCompressed(); 3323 } else if (compress != stream.in.isCompressed() || 3324 blockCompress != stream.in.isBlockCompressed()) { 3325 throw new IOException("All merged files must be compressed or not."); 3326 } 3327 super.put(stream); 3328 } 3329 3330 /** 3331 * A queue of file segments to merge 3332 * @param segments the file segments to merge 3333 * @param tmpDir a relative local directory to save intermediate files in 3334 * @param progress the reference to the Progressable object 3335 */ 3336 public MergeQueue(List <SegmentDescriptor> segments, 3337 Path tmpDir, Progressable progress) { 3338 int size = segments.size(); 3339 for (int i = 0; i < size; i++) { 3340 sortedSegmentSizes.put(segments.get(i), null); 3341 } 3342 this.tmpDir = tmpDir; 3343 this.progress = progress; 3344 } 3345 @Override 3346 protected boolean lessThan(Object a, Object b) { 3347 // indicate we're making progress 3348 if (progress != null) { 3349 progress.progress(); 3350 } 3351 SegmentDescriptor msa = (SegmentDescriptor)a; 3352 SegmentDescriptor msb = (SegmentDescriptor)b; 3353 return comparator.compare(msa.getKey().getData(), 0, 3354 msa.getKey().getLength(), msb.getKey().getData(), 0, 3355 msb.getKey().getLength()) < 0; 3356 } 3357 @Override 3358 public void close() throws IOException { 3359 SegmentDescriptor ms; // close inputs 3360 while ((ms = (SegmentDescriptor)pop()) != null) { 3361 ms.cleanup(); 3362 } 3363 minSegment = null; 3364 } 3365 @Override 3366 public DataOutputBuffer getKey() throws IOException { 3367 return rawKey; 3368 } 3369 @Override 3370 public ValueBytes getValue() throws IOException { 3371 return rawValue; 3372 } 3373 @Override 3374 public boolean next() throws IOException { 3375 if (size() == 0) 3376 return false; 3377 if (minSegment != null) { 3378 //minSegment is non-null for all invocations of next except the first 3379 //one. For the first invocation, the priority queue is ready for use 3380 //but for the subsequent invocations, first adjust the queue 3381 adjustPriorityQueue(minSegment); 3382 if (size() == 0) { 3383 minSegment = null; 3384 return false; 3385 } 3386 } 3387 minSegment = (SegmentDescriptor)top(); 3388 long startPos = minSegment.in.getPosition(); // Current position in stream 3389 //save the raw key reference 3390 rawKey = minSegment.getKey(); 3391 //load the raw value. Re-use the existing rawValue buffer 3392 if (rawValue == null) { 3393 rawValue = minSegment.in.createValueBytes(); 3394 } 3395 minSegment.nextRawValue(rawValue); 3396 long endPos = minSegment.in.getPosition(); // End position after reading value 3397 updateProgress(endPos - startPos); 3398 return true; 3399 } 3400 3401 @Override 3402 public Progress getProgress() { 3403 return mergeProgress; 3404 } 3405 3406 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3407 long startPos = ms.in.getPosition(); // Current position in stream 3408 boolean hasNext = ms.nextRawKey(); 3409 long endPos = ms.in.getPosition(); // End position after reading key 3410 updateProgress(endPos - startPos); 3411 if (hasNext) { 3412 adjustTop(); 3413 } else { 3414 pop(); 3415 ms.cleanup(); 3416 } 3417 } 3418 3419 private void updateProgress(long bytesProcessed) { 3420 totalBytesProcessed += bytesProcessed; 3421 if (progPerByte > 0) { 3422 mergeProgress.set(totalBytesProcessed * progPerByte); 3423 } 3424 } 3425 3426 /** This is the single level merge that is called multiple times 3427 * depending on the factor size and the number of segments 3428 * @return RawKeyValueIterator 3429 * @throws IOException 3430 */ 3431 public RawKeyValueIterator merge() throws IOException { 3432 //create the MergeStreams from the sorted map created in the constructor 3433 //and dump the final output to a file 3434 int numSegments = sortedSegmentSizes.size(); 3435 int origFactor = factor; 3436 int passNo = 1; 3437 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3438 do { 3439 //get the factor for this pass of merge 3440 factor = getPassFactor(passNo, numSegments); 3441 List<SegmentDescriptor> segmentsToMerge = 3442 new ArrayList<SegmentDescriptor>(); 3443 int segmentsConsidered = 0; 3444 int numSegmentsToConsider = factor; 3445 while (true) { 3446 //extract the smallest 'factor' number of segment pointers from the 3447 //TreeMap. Call cleanup on the empty segments (no key/value data) 3448 SegmentDescriptor[] mStream = 3449 getSegmentDescriptors(numSegmentsToConsider); 3450 for (int i = 0; i < mStream.length; i++) { 3451 if (mStream[i].nextRawKey()) { 3452 segmentsToMerge.add(mStream[i]); 3453 segmentsConsidered++; 3454 // Count the fact that we read some bytes in calling nextRawKey() 3455 updateProgress(mStream[i].in.getPosition()); 3456 } 3457 else { 3458 mStream[i].cleanup(); 3459 numSegments--; //we ignore this segment for the merge 3460 } 3461 } 3462 //if we have the desired number of segments 3463 //or looked at all available segments, we break 3464 if (segmentsConsidered == factor || 3465 sortedSegmentSizes.size() == 0) { 3466 break; 3467 } 3468 3469 numSegmentsToConsider = factor - segmentsConsidered; 3470 } 3471 //feed the streams to the priority queue 3472 initialize(segmentsToMerge.size()); clear(); 3473 for (int i = 0; i < segmentsToMerge.size(); i++) { 3474 put(segmentsToMerge.get(i)); 3475 } 3476 //if we have lesser number of segments remaining, then just return the 3477 //iterator, else do another single level merge 3478 if (numSegments <= factor) { 3479 //calculate the length of the remaining segments. Required for 3480 //calculating the merge progress 3481 long totalBytes = 0; 3482 for (int i = 0; i < segmentsToMerge.size(); i++) { 3483 totalBytes += segmentsToMerge.get(i).segmentLength; 3484 } 3485 if (totalBytes != 0) //being paranoid 3486 progPerByte = 1.0f / (float)totalBytes; 3487 //reset factor to what it originally was 3488 factor = origFactor; 3489 return this; 3490 } else { 3491 //we want to spread the creation of temp files on multiple disks if 3492 //available under the space constraints 3493 long approxOutputSize = 0; 3494 for (SegmentDescriptor s : segmentsToMerge) { 3495 approxOutputSize += s.segmentLength + 3496 ChecksumFileSystem.getApproxChkSumLength( 3497 s.segmentLength); 3498 } 3499 Path tmpFilename = 3500 new Path(tmpDir, "intermediate").suffix("." + passNo); 3501 3502 Path outputFile = lDirAlloc.getLocalPathForWrite( 3503 tmpFilename.toString(), 3504 approxOutputSize, conf); 3505 if(LOG.isDebugEnabled()) { 3506 LOG.debug("writing intermediate results to " + outputFile); 3507 } 3508 Writer writer = cloneFileAttributes( 3509 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3510 fs.makeQualified(outputFile), null); 3511 writer.sync = null; //disable sync for temp files 3512 writeFile(this, writer); 3513 writer.close(); 3514 3515 //we finished one single level merge; now clean up the priority 3516 //queue 3517 this.close(); 3518 3519 SegmentDescriptor tempSegment = 3520 new SegmentDescriptor(0, 3521 fs.getFileStatus(outputFile).getLen(), outputFile); 3522 //put the segment back in the TreeMap 3523 sortedSegmentSizes.put(tempSegment, null); 3524 numSegments = sortedSegmentSizes.size(); 3525 passNo++; 3526 } 3527 //we are worried about only the first pass merge factor. So reset the 3528 //factor to what it originally was 3529 factor = origFactor; 3530 } while(true); 3531 } 3532 3533 //Hadoop-591 3534 public int getPassFactor(int passNo, int numSegments) { 3535 if (passNo > 1 || numSegments <= factor || factor == 1) 3536 return factor; 3537 int mod = (numSegments - 1) % (factor - 1); 3538 if (mod == 0) 3539 return factor; 3540 return mod + 1; 3541 } 3542 3543 /** Return (& remove) the requested number of segment descriptors from the 3544 * sorted map. 3545 */ 3546 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3547 if (numDescriptors > sortedSegmentSizes.size()) 3548 numDescriptors = sortedSegmentSizes.size(); 3549 SegmentDescriptor[] SegmentDescriptors = 3550 new SegmentDescriptor[numDescriptors]; 3551 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3552 int i = 0; 3553 while (i < numDescriptors) { 3554 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3555 iter.remove(); 3556 } 3557 return SegmentDescriptors; 3558 } 3559 } // SequenceFile.Sorter.MergeQueue 3560 3561 /** This class defines a merge segment. This class can be subclassed to 3562 * provide a customized cleanup method implementation. In this 3563 * implementation, cleanup closes the file handle and deletes the file 3564 */ 3565 public class SegmentDescriptor implements Comparable { 3566 3567 long segmentOffset; //the start of the segment in the file 3568 long segmentLength; //the length of the segment 3569 Path segmentPathName; //the path name of the file containing the segment 3570 boolean ignoreSync = true; //set to true for temp files 3571 private Reader in = null; 3572 private DataOutputBuffer rawKey = null; //this will hold the current key 3573 private boolean preserveInput = false; //delete input segment files? 3574 3575 /** Constructs a segment 3576 * @param segmentOffset the offset of the segment in the file 3577 * @param segmentLength the length of the segment 3578 * @param segmentPathName the path name of the file containing the segment 3579 */ 3580 public SegmentDescriptor (long segmentOffset, long segmentLength, 3581 Path segmentPathName) { 3582 this.segmentOffset = segmentOffset; 3583 this.segmentLength = segmentLength; 3584 this.segmentPathName = segmentPathName; 3585 } 3586 3587 /** Do the sync checks */ 3588 public void doSync() {ignoreSync = false;} 3589 3590 /** Whether to delete the files when no longer needed */ 3591 public void preserveInput(boolean preserve) { 3592 preserveInput = preserve; 3593 } 3594 3595 public boolean shouldPreserveInput() { 3596 return preserveInput; 3597 } 3598 3599 @Override 3600 public int compareTo(Object o) { 3601 SegmentDescriptor that = (SegmentDescriptor)o; 3602 if (this.segmentLength != that.segmentLength) { 3603 return (this.segmentLength < that.segmentLength ? -1 : 1); 3604 } 3605 if (this.segmentOffset != that.segmentOffset) { 3606 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3607 } 3608 return (this.segmentPathName.toString()). 3609 compareTo(that.segmentPathName.toString()); 3610 } 3611 3612 @Override 3613 public boolean equals(Object o) { 3614 if (!(o instanceof SegmentDescriptor)) { 3615 return false; 3616 } 3617 SegmentDescriptor that = (SegmentDescriptor)o; 3618 if (this.segmentLength == that.segmentLength && 3619 this.segmentOffset == that.segmentOffset && 3620 this.segmentPathName.toString().equals( 3621 that.segmentPathName.toString())) { 3622 return true; 3623 } 3624 return false; 3625 } 3626 3627 @Override 3628 public int hashCode() { 3629 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3630 } 3631 3632 /** Fills up the rawKey object with the key returned by the Reader 3633 * @return true if there is a key returned; false, otherwise 3634 * @throws IOException 3635 */ 3636 public boolean nextRawKey() throws IOException { 3637 if (in == null) { 3638 int bufferSize = getBufferSize(conf); 3639 Reader reader = new Reader(conf, 3640 Reader.file(segmentPathName), 3641 Reader.bufferSize(bufferSize), 3642 Reader.start(segmentOffset), 3643 Reader.length(segmentLength)); 3644 3645 //sometimes we ignore syncs especially for temp merge files 3646 if (ignoreSync) reader.ignoreSync(); 3647 3648 if (reader.getKeyClass() != keyClass) 3649 throw new IOException("wrong key class: " + reader.getKeyClass() + 3650 " is not " + keyClass); 3651 if (reader.getValueClass() != valClass) 3652 throw new IOException("wrong value class: "+reader.getValueClass()+ 3653 " is not " + valClass); 3654 this.in = reader; 3655 rawKey = new DataOutputBuffer(); 3656 } 3657 rawKey.reset(); 3658 int keyLength = 3659 in.nextRawKey(rawKey); 3660 return (keyLength >= 0); 3661 } 3662 3663 /** Fills up the passed rawValue with the value corresponding to the key 3664 * read earlier 3665 * @param rawValue 3666 * @return the length of the value 3667 * @throws IOException 3668 */ 3669 public int nextRawValue(ValueBytes rawValue) throws IOException { 3670 int valLength = in.nextRawValue(rawValue); 3671 return valLength; 3672 } 3673 3674 /** Returns the stored rawKey */ 3675 public DataOutputBuffer getKey() { 3676 return rawKey; 3677 } 3678 3679 /** closes the underlying reader */ 3680 private void close() throws IOException { 3681 this.in.close(); 3682 this.in = null; 3683 } 3684 3685 /** The default cleanup. Subclasses can override this with a custom 3686 * cleanup 3687 */ 3688 public void cleanup() throws IOException { 3689 close(); 3690 if (!preserveInput) { 3691 fs.delete(segmentPathName, true); 3692 } 3693 } 3694 } // SequenceFile.Sorter.SegmentDescriptor 3695 3696 /** This class provisions multiple segments contained within a single 3697 * file 3698 */ 3699 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3700 3701 SegmentContainer parentContainer = null; 3702 3703 /** Constructs a segment 3704 * @param segmentOffset the offset of the segment in the file 3705 * @param segmentLength the length of the segment 3706 * @param segmentPathName the path name of the file containing the segment 3707 * @param parent the parent SegmentContainer that holds the segment 3708 */ 3709 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3710 Path segmentPathName, SegmentContainer parent) { 3711 super(segmentOffset, segmentLength, segmentPathName); 3712 this.parentContainer = parent; 3713 } 3714 /** The default cleanup. Subclasses can override this with a custom 3715 * cleanup 3716 */ 3717 @Override 3718 public void cleanup() throws IOException { 3719 super.close(); 3720 if (super.shouldPreserveInput()) return; 3721 parentContainer.cleanup(); 3722 } 3723 3724 @Override 3725 public boolean equals(Object o) { 3726 if (!(o instanceof LinkedSegmentsDescriptor)) { 3727 return false; 3728 } 3729 return super.equals(o); 3730 } 3731 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3732 3733 /** The class that defines a container for segments to be merged. Primarily 3734 * required to delete temp files as soon as all the contained segments 3735 * have been looked at */ 3736 private class SegmentContainer { 3737 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3738 private int numSegmentsContained; //# of segments contained 3739 private Path inName; //input file from where segments are created 3740 3741 //the list of segments read from the file 3742 private ArrayList <SegmentDescriptor> segments = 3743 new ArrayList <SegmentDescriptor>(); 3744 /** This constructor is there primarily to serve the sort routine that 3745 * generates a single output file with an associated index file */ 3746 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3747 //get the segments from indexIn 3748 FSDataInputStream fsIndexIn = fs.open(indexIn); 3749 long end = fs.getFileStatus(indexIn).getLen(); 3750 while (fsIndexIn.getPos() < end) { 3751 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3752 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3753 Path segmentName = inName; 3754 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3755 segmentLength, segmentName, this)); 3756 } 3757 fsIndexIn.close(); 3758 fs.delete(indexIn, true); 3759 numSegmentsContained = segments.size(); 3760 this.inName = inName; 3761 } 3762 3763 public List <SegmentDescriptor> getSegmentList() { 3764 return segments; 3765 } 3766 public void cleanup() throws IOException { 3767 numSegmentsCleanedUp++; 3768 if (numSegmentsCleanedUp == numSegmentsContained) { 3769 fs.delete(inName, true); 3770 } 3771 } 3772 } //SequenceFile.Sorter.SegmentContainer 3773 3774 } // SequenceFile.Sorter 3775 3776} // SequenceFile