001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.util.*; 023import java.rmi.server.UID; 024import java.security.MessageDigest; 025import org.apache.commons.logging.*; 026import org.apache.hadoop.util.Options; 027import org.apache.hadoop.fs.*; 028import org.apache.hadoop.fs.Options.CreateOpts; 029import org.apache.hadoop.io.compress.CodecPool; 030import org.apache.hadoop.io.compress.CompressionCodec; 031import org.apache.hadoop.io.compress.CompressionInputStream; 032import org.apache.hadoop.io.compress.CompressionOutputStream; 033import org.apache.hadoop.io.compress.Compressor; 034import org.apache.hadoop.io.compress.Decompressor; 035import org.apache.hadoop.io.compress.DefaultCodec; 036import org.apache.hadoop.io.compress.GzipCodec; 037import org.apache.hadoop.io.compress.zlib.ZlibFactory; 038import org.apache.hadoop.io.serializer.Deserializer; 039import org.apache.hadoop.io.serializer.Serializer; 040import org.apache.hadoop.io.serializer.SerializationFactory; 041import org.apache.hadoop.classification.InterfaceAudience; 042import org.apache.hadoop.classification.InterfaceStability; 043import org.apache.hadoop.conf.*; 044import org.apache.hadoop.util.Progressable; 045import org.apache.hadoop.util.Progress; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.util.NativeCodeLoader; 048import org.apache.hadoop.util.MergeSort; 049import org.apache.hadoop.util.PriorityQueue; 050 051/** 052 * <code>SequenceFile</code>s are flat files consisting of binary key/value 053 * pairs. 054 * 055 * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and 056 * {@link Sorter} classes for writing, reading and sorting respectively.</p> 057 * 058 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 059 * {@link CompressionType} used to compress key/value pairs: 060 * <ol> 061 * <li> 062 * <code>Writer</code> : Uncompressed records. 063 * </li> 064 * <li> 065 * <code>RecordCompressWriter</code> : Record-compressed files, only compress 066 * values. 067 * </li> 068 * <li> 069 * <code>BlockCompressWriter</code> : Block-compressed files, both keys & 070 * values are collected in 'blocks' 071 * separately and compressed. The size of 072 * the 'block' is configurable. 073 * </ol> 074 * 075 * <p>The actual compression algorithm used to compress key and/or values can be 076 * specified by using the appropriate {@link CompressionCodec}.</p> 077 * 078 * <p>The recommended way is to use the static <tt>createWriter</tt> methods 079 * provided by the <code>SequenceFile</code> to chose the preferred format.</p> 080 * 081 * <p>The {@link Reader} acts as the bridge and can read any of the above 082 * <code>SequenceFile</code> formats.</p> 083 * 084 * <h4 id="Formats">SequenceFile Formats</h4> 085 * 086 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s 087 * depending on the <code>CompressionType</code> specified. All of them share a 088 * <a href="#Header">common header</a> described below. 089 * 090 * <h5 id="Header">SequenceFile Header</h5> 091 * <ul> 092 * <li> 093 * version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 094 * version number (e.g. SEQ4 or SEQ6) 095 * </li> 096 * <li> 097 * keyClassName -key class 098 * </li> 099 * <li> 100 * valueClassName - value class 101 * </li> 102 * <li> 103 * compression - A boolean which specifies if compression is turned on for 104 * keys/values in this file. 105 * </li> 106 * <li> 107 * blockCompression - A boolean which specifies if block-compression is 108 * turned on for keys/values in this file. 109 * </li> 110 * <li> 111 * compression codec - <code>CompressionCodec</code> class which is used for 112 * compression of keys and/or values (if compression is 113 * enabled). 114 * </li> 115 * <li> 116 * metadata - {@link Metadata} for this file. 117 * </li> 118 * <li> 119 * sync - A sync marker to denote end of the header. 120 * </li> 121 * </ul> 122 * 123 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5> 124 * <ul> 125 * <li> 126 * <a href="#Header">Header</a> 127 * </li> 128 * <li> 129 * Record 130 * <ul> 131 * <li>Record length</li> 132 * <li>Key length</li> 133 * <li>Key</li> 134 * <li>Value</li> 135 * </ul> 136 * </li> 137 * <li> 138 * A sync-marker every few <code>100</code> bytes or so. 139 * </li> 140 * </ul> 141 * 142 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5> 143 * <ul> 144 * <li> 145 * <a href="#Header">Header</a> 146 * </li> 147 * <li> 148 * Record 149 * <ul> 150 * <li>Record length</li> 151 * <li>Key length</li> 152 * <li>Key</li> 153 * <li><i>Compressed</i> Value</li> 154 * </ul> 155 * </li> 156 * <li> 157 * A sync-marker every few <code>100</code> bytes or so. 158 * </li> 159 * </ul> 160 * 161 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5> 162 * <ul> 163 * <li> 164 * <a href="#Header">Header</a> 165 * </li> 166 * <li> 167 * Record <i>Block</i> 168 * <ul> 169 * <li>Uncompressed number of records in the block</li> 170 * <li>Compressed key-lengths block-size</li> 171 * <li>Compressed key-lengths block</li> 172 * <li>Compressed keys block-size</li> 173 * <li>Compressed keys block</li> 174 * <li>Compressed value-lengths block-size</li> 175 * <li>Compressed value-lengths block</li> 176 * <li>Compressed values block-size</li> 177 * <li>Compressed values block</li> 178 * </ul> 179 * </li> 180 * <li> 181 * A sync-marker every block. 182 * </li> 183 * </ul> 184 * 185 * <p>The compressed blocks of key lengths and value lengths consist of the 186 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 187 * format.</p> 188 * 189 * @see CompressionCodec 190 */ 191@InterfaceAudience.Public 192@InterfaceStability.Stable 193public class SequenceFile { 194 private static final Log LOG = LogFactory.getLog(SequenceFile.class); 195 196 private SequenceFile() {} // no public ctor 197 198 private static final byte BLOCK_COMPRESS_VERSION = (byte)4; 199 private static final byte CUSTOM_COMPRESS_VERSION = (byte)5; 200 private static final byte VERSION_WITH_METADATA = (byte)6; 201 private static byte[] VERSION = new byte[] { 202 (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA 203 }; 204 205 private static final int SYNC_ESCAPE = -1; // "length" of sync entries 206 private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash 207 private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash 208 209 /** The number of bytes between sync points.*/ 210 public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 211 212 /** 213 * The compression type used to compress key/value pairs in the 214 * {@link SequenceFile}. 215 * 216 * @see SequenceFile.Writer 217 */ 218 public static enum CompressionType { 219 /** Do not compress records. */ 220 NONE, 221 /** Compress values only, each separately. */ 222 RECORD, 223 /** Compress sequences of records together in blocks. */ 224 BLOCK 225 } 226 227 /** 228 * Get the compression type for the reduce outputs 229 * @param job the job config to look in 230 * @return the kind of compression to use 231 */ 232 static public CompressionType getDefaultCompressionType(Configuration job) { 233 String name = job.get("io.seqfile.compression.type"); 234 return name == null ? CompressionType.RECORD : 235 CompressionType.valueOf(name); 236 } 237 238 /** 239 * Set the default compression type for sequence files. 240 * @param job the configuration to modify 241 * @param val the new compression type (none, block, record) 242 */ 243 static public void setDefaultCompressionType(Configuration job, 244 CompressionType val) { 245 job.set("io.seqfile.compression.type", val.toString()); 246 } 247 248 /** 249 * Create a new Writer with the given options. 250 * @param conf the configuration to use 251 * @param opts the options to create the file with 252 * @return a new Writer 253 * @throws IOException 254 */ 255 public static Writer createWriter(Configuration conf, Writer.Option... opts 256 ) throws IOException { 257 Writer.CompressionOption compressionOption = 258 Options.getOption(Writer.CompressionOption.class, opts); 259 CompressionType kind; 260 if (compressionOption != null) { 261 kind = compressionOption.getValue(); 262 } else { 263 kind = getDefaultCompressionType(conf); 264 opts = Options.prependOptions(opts, Writer.compression(kind)); 265 } 266 switch (kind) { 267 default: 268 case NONE: 269 return new Writer(conf, opts); 270 case RECORD: 271 return new RecordCompressWriter(conf, opts); 272 case BLOCK: 273 return new BlockCompressWriter(conf, opts); 274 } 275 } 276 277 /** 278 * Construct the preferred type of SequenceFile Writer. 279 * @param fs The configured filesystem. 280 * @param conf The configuration. 281 * @param name The name of the file. 282 * @param keyClass The 'key' type. 283 * @param valClass The 'value' type. 284 * @return Returns the handle to the constructed SequenceFile Writer. 285 * @throws IOException 286 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 287 * instead. 288 */ 289 @Deprecated 290 public static Writer 291 createWriter(FileSystem fs, Configuration conf, Path name, 292 Class keyClass, Class valClass) throws IOException { 293 return createWriter(conf, Writer.filesystem(fs), 294 Writer.file(name), Writer.keyClass(keyClass), 295 Writer.valueClass(valClass)); 296 } 297 298 /** 299 * Construct the preferred type of SequenceFile Writer. 300 * @param fs The configured filesystem. 301 * @param conf The configuration. 302 * @param name The name of the file. 303 * @param keyClass The 'key' type. 304 * @param valClass The 'value' type. 305 * @param compressionType The compression type. 306 * @return Returns the handle to the constructed SequenceFile Writer. 307 * @throws IOException 308 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 309 * instead. 310 */ 311 @Deprecated 312 public static Writer 313 createWriter(FileSystem fs, Configuration conf, Path name, 314 Class keyClass, Class valClass, 315 CompressionType compressionType) throws IOException { 316 return createWriter(conf, Writer.filesystem(fs), 317 Writer.file(name), Writer.keyClass(keyClass), 318 Writer.valueClass(valClass), 319 Writer.compression(compressionType)); 320 } 321 322 /** 323 * Construct the preferred type of SequenceFile Writer. 324 * @param fs The configured filesystem. 325 * @param conf The configuration. 326 * @param name The name of the file. 327 * @param keyClass The 'key' type. 328 * @param valClass The 'value' type. 329 * @param compressionType The compression type. 330 * @param progress The Progressable object to track progress. 331 * @return Returns the handle to the constructed SequenceFile Writer. 332 * @throws IOException 333 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 334 * instead. 335 */ 336 @Deprecated 337 public static Writer 338 createWriter(FileSystem fs, Configuration conf, Path name, 339 Class keyClass, Class valClass, CompressionType compressionType, 340 Progressable progress) throws IOException { 341 return createWriter(conf, Writer.file(name), 342 Writer.filesystem(fs), 343 Writer.keyClass(keyClass), 344 Writer.valueClass(valClass), 345 Writer.compression(compressionType), 346 Writer.progressable(progress)); 347 } 348 349 /** 350 * Construct the preferred type of SequenceFile Writer. 351 * @param fs The configured filesystem. 352 * @param conf The configuration. 353 * @param name The name of the file. 354 * @param keyClass The 'key' type. 355 * @param valClass The 'value' type. 356 * @param compressionType The compression type. 357 * @param codec The compression codec. 358 * @return Returns the handle to the constructed SequenceFile Writer. 359 * @throws IOException 360 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 361 * instead. 362 */ 363 @Deprecated 364 public static Writer 365 createWriter(FileSystem fs, Configuration conf, Path name, 366 Class keyClass, Class valClass, CompressionType compressionType, 367 CompressionCodec codec) throws IOException { 368 return createWriter(conf, Writer.file(name), 369 Writer.filesystem(fs), 370 Writer.keyClass(keyClass), 371 Writer.valueClass(valClass), 372 Writer.compression(compressionType, codec)); 373 } 374 375 /** 376 * Construct the preferred type of SequenceFile Writer. 377 * @param fs The configured filesystem. 378 * @param conf The configuration. 379 * @param name The name of the file. 380 * @param keyClass The 'key' type. 381 * @param valClass The 'value' type. 382 * @param compressionType The compression type. 383 * @param codec The compression codec. 384 * @param progress The Progressable object to track progress. 385 * @param metadata The metadata of the file. 386 * @return Returns the handle to the constructed SequenceFile Writer. 387 * @throws IOException 388 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 389 * instead. 390 */ 391 @Deprecated 392 public static Writer 393 createWriter(FileSystem fs, Configuration conf, Path name, 394 Class keyClass, Class valClass, 395 CompressionType compressionType, CompressionCodec codec, 396 Progressable progress, Metadata metadata) throws IOException { 397 return createWriter(conf, Writer.file(name), 398 Writer.filesystem(fs), 399 Writer.keyClass(keyClass), 400 Writer.valueClass(valClass), 401 Writer.compression(compressionType, codec), 402 Writer.progressable(progress), 403 Writer.metadata(metadata)); 404 } 405 406 /** 407 * Construct the preferred type of SequenceFile Writer. 408 * @param fs The configured filesystem. 409 * @param conf The configuration. 410 * @param name The name of the file. 411 * @param keyClass The 'key' type. 412 * @param valClass The 'value' type. 413 * @param bufferSize buffer size for the underlaying outputstream. 414 * @param replication replication factor for the file. 415 * @param blockSize block size for the file. 416 * @param compressionType The compression type. 417 * @param codec The compression codec. 418 * @param progress The Progressable object to track progress. 419 * @param metadata The metadata of the file. 420 * @return Returns the handle to the constructed SequenceFile Writer. 421 * @throws IOException 422 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 423 * instead. 424 */ 425 @Deprecated 426 public static Writer 427 createWriter(FileSystem fs, Configuration conf, Path name, 428 Class keyClass, Class valClass, int bufferSize, 429 short replication, long blockSize, 430 CompressionType compressionType, CompressionCodec codec, 431 Progressable progress, Metadata metadata) throws IOException { 432 return createWriter(conf, Writer.file(name), 433 Writer.filesystem(fs), 434 Writer.keyClass(keyClass), 435 Writer.valueClass(valClass), 436 Writer.bufferSize(bufferSize), 437 Writer.replication(replication), 438 Writer.blockSize(blockSize), 439 Writer.compression(compressionType, codec), 440 Writer.progressable(progress), 441 Writer.metadata(metadata)); 442 } 443 444 /** 445 * Construct the preferred type of SequenceFile Writer. 446 * @param fs The configured filesystem. 447 * @param conf The configuration. 448 * @param name The name of the file. 449 * @param keyClass The 'key' type. 450 * @param valClass The 'value' type. 451 * @param bufferSize buffer size for the underlaying outputstream. 452 * @param replication replication factor for the file. 453 * @param blockSize block size for the file. 454 * @param createParent create parent directory if non-existent 455 * @param compressionType The compression type. 456 * @param codec The compression codec. 457 * @param metadata The metadata of the file. 458 * @return Returns the handle to the constructed SequenceFile Writer. 459 * @throws IOException 460 */ 461 @Deprecated 462 public static Writer 463 createWriter(FileSystem fs, Configuration conf, Path name, 464 Class keyClass, Class valClass, int bufferSize, 465 short replication, long blockSize, boolean createParent, 466 CompressionType compressionType, CompressionCodec codec, 467 Metadata metadata) throws IOException { 468 return createWriter(FileContext.getFileContext(fs.getUri(), conf), 469 conf, name, keyClass, valClass, compressionType, codec, 470 metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), 471 CreateOpts.bufferSize(bufferSize), 472 createParent ? CreateOpts.createParent() 473 : CreateOpts.donotCreateParent(), 474 CreateOpts.repFac(replication), 475 CreateOpts.blockSize(blockSize) 476 ); 477 } 478 479 /** 480 * Construct the preferred type of SequenceFile Writer. 481 * @param fc The context for the specified file. 482 * @param conf The configuration. 483 * @param name The name of the file. 484 * @param keyClass The 'key' type. 485 * @param valClass The 'value' type. 486 * @param compressionType The compression type. 487 * @param codec The compression codec. 488 * @param metadata The metadata of the file. 489 * @param createFlag gives the semantics of create: overwrite, append etc. 490 * @param opts file creation options; see {@link CreateOpts}. 491 * @return Returns the handle to the constructed SequenceFile Writer. 492 * @throws IOException 493 */ 494 public static Writer 495 createWriter(FileContext fc, Configuration conf, Path name, 496 Class keyClass, Class valClass, 497 CompressionType compressionType, CompressionCodec codec, 498 Metadata metadata, 499 final EnumSet<CreateFlag> createFlag, CreateOpts... opts) 500 throws IOException { 501 return createWriter(conf, fc.create(name, createFlag, opts), 502 keyClass, valClass, compressionType, codec, metadata).ownStream(); 503 } 504 505 /** 506 * Construct the preferred type of SequenceFile Writer. 507 * @param fs The configured filesystem. 508 * @param conf The configuration. 509 * @param name The name of the file. 510 * @param keyClass The 'key' type. 511 * @param valClass The 'value' type. 512 * @param compressionType The compression type. 513 * @param codec The compression codec. 514 * @param progress The Progressable object to track progress. 515 * @return Returns the handle to the constructed SequenceFile Writer. 516 * @throws IOException 517 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 518 * instead. 519 */ 520 @Deprecated 521 public static Writer 522 createWriter(FileSystem fs, Configuration conf, Path name, 523 Class keyClass, Class valClass, 524 CompressionType compressionType, CompressionCodec codec, 525 Progressable progress) throws IOException { 526 return createWriter(conf, Writer.file(name), 527 Writer.filesystem(fs), 528 Writer.keyClass(keyClass), 529 Writer.valueClass(valClass), 530 Writer.compression(compressionType, codec), 531 Writer.progressable(progress)); 532 } 533 534 /** 535 * Construct the preferred type of 'raw' SequenceFile Writer. 536 * @param conf The configuration. 537 * @param out The stream on top which the writer is to be constructed. 538 * @param keyClass The 'key' type. 539 * @param valClass The 'value' type. 540 * @param compressionType The compression type. 541 * @param codec The compression codec. 542 * @param metadata The metadata of the file. 543 * @return Returns the handle to the constructed SequenceFile Writer. 544 * @throws IOException 545 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 546 * instead. 547 */ 548 @Deprecated 549 public static Writer 550 createWriter(Configuration conf, FSDataOutputStream out, 551 Class keyClass, Class valClass, 552 CompressionType compressionType, 553 CompressionCodec codec, Metadata metadata) throws IOException { 554 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 555 Writer.valueClass(valClass), 556 Writer.compression(compressionType, codec), 557 Writer.metadata(metadata)); 558 } 559 560 /** 561 * Construct the preferred type of 'raw' SequenceFile Writer. 562 * @param conf The configuration. 563 * @param out The stream on top which the writer is to be constructed. 564 * @param keyClass The 'key' type. 565 * @param valClass The 'value' type. 566 * @param compressionType The compression type. 567 * @param codec The compression codec. 568 * @return Returns the handle to the constructed SequenceFile Writer. 569 * @throws IOException 570 * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} 571 * instead. 572 */ 573 @Deprecated 574 public static Writer 575 createWriter(Configuration conf, FSDataOutputStream out, 576 Class keyClass, Class valClass, CompressionType compressionType, 577 CompressionCodec codec) throws IOException { 578 return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), 579 Writer.valueClass(valClass), 580 Writer.compression(compressionType, codec)); 581 } 582 583 584 /** The interface to 'raw' values of SequenceFiles. */ 585 public static interface ValueBytes { 586 587 /** Writes the uncompressed bytes to the outStream. 588 * @param outStream : Stream to write uncompressed bytes into. 589 * @throws IOException 590 */ 591 public void writeUncompressedBytes(DataOutputStream outStream) 592 throws IOException; 593 594 /** Write compressed bytes to outStream. 595 * Note: that it will NOT compress the bytes if they are not compressed. 596 * @param outStream : Stream to write compressed bytes into. 597 */ 598 public void writeCompressedBytes(DataOutputStream outStream) 599 throws IllegalArgumentException, IOException; 600 601 /** 602 * Size of stored data. 603 */ 604 public int getSize(); 605 } 606 607 private static class UncompressedBytes implements ValueBytes { 608 private int dataSize; 609 private byte[] data; 610 611 private UncompressedBytes() { 612 data = null; 613 dataSize = 0; 614 } 615 616 private void reset(DataInputStream in, int length) throws IOException { 617 if (data == null) { 618 data = new byte[length]; 619 } else if (length > data.length) { 620 data = new byte[Math.max(length, data.length * 2)]; 621 } 622 dataSize = -1; 623 in.readFully(data, 0, length); 624 dataSize = length; 625 } 626 627 public int getSize() { 628 return dataSize; 629 } 630 631 public void writeUncompressedBytes(DataOutputStream outStream) 632 throws IOException { 633 outStream.write(data, 0, dataSize); 634 } 635 636 public void writeCompressedBytes(DataOutputStream outStream) 637 throws IllegalArgumentException, IOException { 638 throw 639 new IllegalArgumentException("UncompressedBytes cannot be compressed!"); 640 } 641 642 } // UncompressedBytes 643 644 private static class CompressedBytes implements ValueBytes { 645 private int dataSize; 646 private byte[] data; 647 DataInputBuffer rawData = null; 648 CompressionCodec codec = null; 649 CompressionInputStream decompressedStream = null; 650 651 private CompressedBytes(CompressionCodec codec) { 652 data = null; 653 dataSize = 0; 654 this.codec = codec; 655 } 656 657 private void reset(DataInputStream in, int length) throws IOException { 658 if (data == null) { 659 data = new byte[length]; 660 } else if (length > data.length) { 661 data = new byte[Math.max(length, data.length * 2)]; 662 } 663 dataSize = -1; 664 in.readFully(data, 0, length); 665 dataSize = length; 666 } 667 668 public int getSize() { 669 return dataSize; 670 } 671 672 public void writeUncompressedBytes(DataOutputStream outStream) 673 throws IOException { 674 if (decompressedStream == null) { 675 rawData = new DataInputBuffer(); 676 decompressedStream = codec.createInputStream(rawData); 677 } else { 678 decompressedStream.resetState(); 679 } 680 rawData.reset(data, 0, dataSize); 681 682 byte[] buffer = new byte[8192]; 683 int bytesRead = 0; 684 while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) { 685 outStream.write(buffer, 0, bytesRead); 686 } 687 } 688 689 public void writeCompressedBytes(DataOutputStream outStream) 690 throws IllegalArgumentException, IOException { 691 outStream.write(data, 0, dataSize); 692 } 693 694 } // CompressedBytes 695 696 /** 697 * The class encapsulating with the metadata of a file. 698 * The metadata of a file is a list of attribute name/value 699 * pairs of Text type. 700 * 701 */ 702 public static class Metadata implements Writable { 703 704 private TreeMap<Text, Text> theMetadata; 705 706 public Metadata() { 707 this(new TreeMap<Text, Text>()); 708 } 709 710 public Metadata(TreeMap<Text, Text> arg) { 711 if (arg == null) { 712 this.theMetadata = new TreeMap<Text, Text>(); 713 } else { 714 this.theMetadata = arg; 715 } 716 } 717 718 public Text get(Text name) { 719 return this.theMetadata.get(name); 720 } 721 722 public void set(Text name, Text value) { 723 this.theMetadata.put(name, value); 724 } 725 726 public TreeMap<Text, Text> getMetadata() { 727 return new TreeMap<Text, Text>(this.theMetadata); 728 } 729 730 public void write(DataOutput out) throws IOException { 731 out.writeInt(this.theMetadata.size()); 732 Iterator<Map.Entry<Text, Text>> iter = 733 this.theMetadata.entrySet().iterator(); 734 while (iter.hasNext()) { 735 Map.Entry<Text, Text> en = iter.next(); 736 en.getKey().write(out); 737 en.getValue().write(out); 738 } 739 } 740 741 public void readFields(DataInput in) throws IOException { 742 int sz = in.readInt(); 743 if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object"); 744 this.theMetadata = new TreeMap<Text, Text>(); 745 for (int i = 0; i < sz; i++) { 746 Text key = new Text(); 747 Text val = new Text(); 748 key.readFields(in); 749 val.readFields(in); 750 this.theMetadata.put(key, val); 751 } 752 } 753 754 public boolean equals(Object other) { 755 if (other == null) { 756 return false; 757 } 758 if (other.getClass() != this.getClass()) { 759 return false; 760 } else { 761 return equals((Metadata)other); 762 } 763 } 764 765 public boolean equals(Metadata other) { 766 if (other == null) return false; 767 if (this.theMetadata.size() != other.theMetadata.size()) { 768 return false; 769 } 770 Iterator<Map.Entry<Text, Text>> iter1 = 771 this.theMetadata.entrySet().iterator(); 772 Iterator<Map.Entry<Text, Text>> iter2 = 773 other.theMetadata.entrySet().iterator(); 774 while (iter1.hasNext() && iter2.hasNext()) { 775 Map.Entry<Text, Text> en1 = iter1.next(); 776 Map.Entry<Text, Text> en2 = iter2.next(); 777 if (!en1.getKey().equals(en2.getKey())) { 778 return false; 779 } 780 if (!en1.getValue().equals(en2.getValue())) { 781 return false; 782 } 783 } 784 if (iter1.hasNext() || iter2.hasNext()) { 785 return false; 786 } 787 return true; 788 } 789 790 public int hashCode() { 791 assert false : "hashCode not designed"; 792 return 42; // any arbitrary constant will do 793 } 794 795 public String toString() { 796 StringBuilder sb = new StringBuilder(); 797 sb.append("size: ").append(this.theMetadata.size()).append("\n"); 798 Iterator<Map.Entry<Text, Text>> iter = 799 this.theMetadata.entrySet().iterator(); 800 while (iter.hasNext()) { 801 Map.Entry<Text, Text> en = iter.next(); 802 sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString()); 803 sb.append("\n"); 804 } 805 return sb.toString(); 806 } 807 } 808 809 /** Write key/value pairs to a sequence-format file. */ 810 public static class Writer implements java.io.Closeable { 811 private Configuration conf; 812 FSDataOutputStream out; 813 boolean ownOutputStream = true; 814 DataOutputBuffer buffer = new DataOutputBuffer(); 815 816 Class keyClass; 817 Class valClass; 818 819 private final CompressionType compress; 820 CompressionCodec codec = null; 821 CompressionOutputStream deflateFilter = null; 822 DataOutputStream deflateOut = null; 823 Metadata metadata = null; 824 Compressor compressor = null; 825 826 protected Serializer keySerializer; 827 protected Serializer uncompressedValSerializer; 828 protected Serializer compressedValSerializer; 829 830 // Insert a globally unique 16-byte value every few entries, so that one 831 // can seek into the middle of a file and then synchronize with record 832 // starts and ends by scanning for this value. 833 long lastSyncPos; // position of last sync 834 byte[] sync; // 16 random bytes 835 { 836 try { 837 MessageDigest digester = MessageDigest.getInstance("MD5"); 838 long time = System.currentTimeMillis(); 839 digester.update((new UID()+"@"+time).getBytes()); 840 sync = digester.digest(); 841 } catch (Exception e) { 842 throw new RuntimeException(e); 843 } 844 } 845 846 public static interface Option {} 847 848 static class FileOption extends Options.PathOption 849 implements Option { 850 FileOption(Path path) { 851 super(path); 852 } 853 } 854 855 /** 856 * @deprecated only used for backwards-compatibility in the createWriter methods 857 * that take FileSystem. 858 */ 859 @Deprecated 860 private static class FileSystemOption implements Option { 861 private final FileSystem value; 862 protected FileSystemOption(FileSystem value) { 863 this.value = value; 864 } 865 public FileSystem getValue() { 866 return value; 867 } 868 } 869 870 static class StreamOption extends Options.FSDataOutputStreamOption 871 implements Option { 872 StreamOption(FSDataOutputStream stream) { 873 super(stream); 874 } 875 } 876 877 static class BufferSizeOption extends Options.IntegerOption 878 implements Option { 879 BufferSizeOption(int value) { 880 super(value); 881 } 882 } 883 884 static class BlockSizeOption extends Options.LongOption implements Option { 885 BlockSizeOption(long value) { 886 super(value); 887 } 888 } 889 890 static class ReplicationOption extends Options.IntegerOption 891 implements Option { 892 ReplicationOption(int value) { 893 super(value); 894 } 895 } 896 897 static class KeyClassOption extends Options.ClassOption implements Option { 898 KeyClassOption(Class<?> value) { 899 super(value); 900 } 901 } 902 903 static class ValueClassOption extends Options.ClassOption 904 implements Option { 905 ValueClassOption(Class<?> value) { 906 super(value); 907 } 908 } 909 910 static class MetadataOption implements Option { 911 private final Metadata value; 912 MetadataOption(Metadata value) { 913 this.value = value; 914 } 915 Metadata getValue() { 916 return value; 917 } 918 } 919 920 static class ProgressableOption extends Options.ProgressableOption 921 implements Option { 922 ProgressableOption(Progressable value) { 923 super(value); 924 } 925 } 926 927 private static class CompressionOption implements Option { 928 private final CompressionType value; 929 private final CompressionCodec codec; 930 CompressionOption(CompressionType value) { 931 this(value, null); 932 } 933 CompressionOption(CompressionType value, CompressionCodec codec) { 934 this.value = value; 935 this.codec = (CompressionType.NONE != value && null == codec) 936 ? new DefaultCodec() 937 : codec; 938 } 939 CompressionType getValue() { 940 return value; 941 } 942 CompressionCodec getCodec() { 943 return codec; 944 } 945 } 946 947 public static Option file(Path value) { 948 return new FileOption(value); 949 } 950 951 /** 952 * @deprecated only used for backwards-compatibility in the createWriter methods 953 * that take FileSystem. 954 */ 955 @Deprecated 956 private static Option filesystem(FileSystem fs) { 957 return new SequenceFile.Writer.FileSystemOption(fs); 958 } 959 960 public static Option bufferSize(int value) { 961 return new BufferSizeOption(value); 962 } 963 964 public static Option stream(FSDataOutputStream value) { 965 return new StreamOption(value); 966 } 967 968 public static Option replication(short value) { 969 return new ReplicationOption(value); 970 } 971 972 public static Option blockSize(long value) { 973 return new BlockSizeOption(value); 974 } 975 976 public static Option progressable(Progressable value) { 977 return new ProgressableOption(value); 978 } 979 980 public static Option keyClass(Class<?> value) { 981 return new KeyClassOption(value); 982 } 983 984 public static Option valueClass(Class<?> value) { 985 return new ValueClassOption(value); 986 } 987 988 public static Option metadata(Metadata value) { 989 return new MetadataOption(value); 990 } 991 992 public static Option compression(CompressionType value) { 993 return new CompressionOption(value); 994 } 995 996 public static Option compression(CompressionType value, 997 CompressionCodec codec) { 998 return new CompressionOption(value, codec); 999 } 1000 1001 /** 1002 * Construct a uncompressed writer from a set of options. 1003 * @param conf the configuration to use 1004 * @param options the options used when creating the writer 1005 * @throws IOException if it fails 1006 */ 1007 Writer(Configuration conf, 1008 Option... opts) throws IOException { 1009 BlockSizeOption blockSizeOption = 1010 Options.getOption(BlockSizeOption.class, opts); 1011 BufferSizeOption bufferSizeOption = 1012 Options.getOption(BufferSizeOption.class, opts); 1013 ReplicationOption replicationOption = 1014 Options.getOption(ReplicationOption.class, opts); 1015 ProgressableOption progressOption = 1016 Options.getOption(ProgressableOption.class, opts); 1017 FileOption fileOption = Options.getOption(FileOption.class, opts); 1018 FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts); 1019 StreamOption streamOption = Options.getOption(StreamOption.class, opts); 1020 KeyClassOption keyClassOption = 1021 Options.getOption(KeyClassOption.class, opts); 1022 ValueClassOption valueClassOption = 1023 Options.getOption(ValueClassOption.class, opts); 1024 MetadataOption metadataOption = 1025 Options.getOption(MetadataOption.class, opts); 1026 CompressionOption compressionTypeOption = 1027 Options.getOption(CompressionOption.class, opts); 1028 // check consistency of options 1029 if ((fileOption == null) == (streamOption == null)) { 1030 throw new IllegalArgumentException("file or stream must be specified"); 1031 } 1032 if (fileOption == null && (blockSizeOption != null || 1033 bufferSizeOption != null || 1034 replicationOption != null || 1035 progressOption != null)) { 1036 throw new IllegalArgumentException("file modifier options not " + 1037 "compatible with stream"); 1038 } 1039 1040 FSDataOutputStream out; 1041 boolean ownStream = fileOption != null; 1042 if (ownStream) { 1043 Path p = fileOption.getValue(); 1044 FileSystem fs; 1045 if (fsOption != null) { 1046 fs = fsOption.getValue(); 1047 } else { 1048 fs = p.getFileSystem(conf); 1049 } 1050 int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : 1051 bufferSizeOption.getValue(); 1052 short replication = replicationOption == null ? 1053 fs.getDefaultReplication(p) : 1054 (short) replicationOption.getValue(); 1055 long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : 1056 blockSizeOption.getValue(); 1057 Progressable progress = progressOption == null ? null : 1058 progressOption.getValue(); 1059 out = fs.create(p, true, bufferSize, replication, blockSize, progress); 1060 } else { 1061 out = streamOption.getValue(); 1062 } 1063 Class<?> keyClass = keyClassOption == null ? 1064 Object.class : keyClassOption.getValue(); 1065 Class<?> valueClass = valueClassOption == null ? 1066 Object.class : valueClassOption.getValue(); 1067 Metadata metadata = metadataOption == null ? 1068 new Metadata() : metadataOption.getValue(); 1069 this.compress = compressionTypeOption.getValue(); 1070 final CompressionCodec codec = compressionTypeOption.getCodec(); 1071 if (codec != null && 1072 (codec instanceof GzipCodec) && 1073 !NativeCodeLoader.isNativeCodeLoaded() && 1074 !ZlibFactory.isNativeZlibLoaded(conf)) { 1075 throw new IllegalArgumentException("SequenceFile doesn't work with " + 1076 "GzipCodec without native-hadoop " + 1077 "code!"); 1078 } 1079 init(conf, out, ownStream, keyClass, valueClass, codec, metadata); 1080 } 1081 1082 /** Create the named file. 1083 * @deprecated Use 1084 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1085 * instead. 1086 */ 1087 @Deprecated 1088 public Writer(FileSystem fs, Configuration conf, Path name, 1089 Class keyClass, Class valClass) throws IOException { 1090 this.compress = CompressionType.NONE; 1091 init(conf, fs.create(name), true, keyClass, valClass, null, 1092 new Metadata()); 1093 } 1094 1095 /** Create the named file with write-progress reporter. 1096 * @deprecated Use 1097 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1098 * instead. 1099 */ 1100 @Deprecated 1101 public Writer(FileSystem fs, Configuration conf, Path name, 1102 Class keyClass, Class valClass, 1103 Progressable progress, Metadata metadata) throws IOException { 1104 this.compress = CompressionType.NONE; 1105 init(conf, fs.create(name, progress), true, keyClass, valClass, 1106 null, metadata); 1107 } 1108 1109 /** Create the named file with write-progress reporter. 1110 * @deprecated Use 1111 * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 1112 * instead. 1113 */ 1114 @Deprecated 1115 public Writer(FileSystem fs, Configuration conf, Path name, 1116 Class keyClass, Class valClass, 1117 int bufferSize, short replication, long blockSize, 1118 Progressable progress, Metadata metadata) throws IOException { 1119 this.compress = CompressionType.NONE; 1120 init(conf, 1121 fs.create(name, true, bufferSize, replication, blockSize, progress), 1122 true, keyClass, valClass, null, metadata); 1123 } 1124 1125 boolean isCompressed() { return compress != CompressionType.NONE; } 1126 boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } 1127 1128 Writer ownStream() { this.ownOutputStream = true; return this; } 1129 1130 /** Write and flush the file header. */ 1131 private void writeFileHeader() 1132 throws IOException { 1133 out.write(VERSION); 1134 Text.writeString(out, keyClass.getName()); 1135 Text.writeString(out, valClass.getName()); 1136 1137 out.writeBoolean(this.isCompressed()); 1138 out.writeBoolean(this.isBlockCompressed()); 1139 1140 if (this.isCompressed()) { 1141 Text.writeString(out, (codec.getClass()).getName()); 1142 } 1143 this.metadata.write(out); 1144 out.write(sync); // write the sync bytes 1145 out.flush(); // flush header 1146 } 1147 1148 /** Initialize. */ 1149 @SuppressWarnings("unchecked") 1150 void init(Configuration conf, FSDataOutputStream out, boolean ownStream, 1151 Class keyClass, Class valClass, 1152 CompressionCodec codec, Metadata metadata) 1153 throws IOException { 1154 this.conf = conf; 1155 this.out = out; 1156 this.ownOutputStream = ownStream; 1157 this.keyClass = keyClass; 1158 this.valClass = valClass; 1159 this.codec = codec; 1160 this.metadata = metadata; 1161 SerializationFactory serializationFactory = new SerializationFactory(conf); 1162 this.keySerializer = serializationFactory.getSerializer(keyClass); 1163 this.keySerializer.open(buffer); 1164 this.uncompressedValSerializer = serializationFactory.getSerializer(valClass); 1165 this.uncompressedValSerializer.open(buffer); 1166 if (this.codec != null) { 1167 ReflectionUtils.setConf(this.codec, this.conf); 1168 this.compressor = CodecPool.getCompressor(this.codec); 1169 this.deflateFilter = this.codec.createOutputStream(buffer, compressor); 1170 this.deflateOut = 1171 new DataOutputStream(new BufferedOutputStream(deflateFilter)); 1172 this.compressedValSerializer = serializationFactory.getSerializer(valClass); 1173 this.compressedValSerializer.open(deflateOut); 1174 } 1175 writeFileHeader(); 1176 } 1177 1178 /** Returns the class of keys in this file. */ 1179 public Class getKeyClass() { return keyClass; } 1180 1181 /** Returns the class of values in this file. */ 1182 public Class getValueClass() { return valClass; } 1183 1184 /** Returns the compression codec of data in this file. */ 1185 public CompressionCodec getCompressionCodec() { return codec; } 1186 1187 /** create a sync point */ 1188 public void sync() throws IOException { 1189 if (sync != null && lastSyncPos != out.getPos()) { 1190 out.writeInt(SYNC_ESCAPE); // mark the start of the sync 1191 out.write(sync); // write sync 1192 lastSyncPos = out.getPos(); // update lastSyncPos 1193 } 1194 } 1195 1196 /** flush all currently written data to the file system */ 1197 public void syncFs() throws IOException { 1198 if (out != null) { 1199 out.sync(); // flush contents to file system 1200 } 1201 } 1202 1203 /** Returns the configuration of this file. */ 1204 Configuration getConf() { return conf; } 1205 1206 /** Close the file. */ 1207 public synchronized void close() throws IOException { 1208 keySerializer.close(); 1209 uncompressedValSerializer.close(); 1210 if (compressedValSerializer != null) { 1211 compressedValSerializer.close(); 1212 } 1213 1214 CodecPool.returnCompressor(compressor); 1215 compressor = null; 1216 1217 if (out != null) { 1218 1219 // Close the underlying stream iff we own it... 1220 if (ownOutputStream) { 1221 out.close(); 1222 } else { 1223 out.flush(); 1224 } 1225 out = null; 1226 } 1227 } 1228 1229 synchronized void checkAndWriteSync() throws IOException { 1230 if (sync != null && 1231 out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync 1232 sync(); 1233 } 1234 } 1235 1236 /** Append a key/value pair. */ 1237 public void append(Writable key, Writable val) 1238 throws IOException { 1239 append((Object) key, (Object) val); 1240 } 1241 1242 /** Append a key/value pair. */ 1243 @SuppressWarnings("unchecked") 1244 public synchronized void append(Object key, Object val) 1245 throws IOException { 1246 if (key.getClass() != keyClass) 1247 throw new IOException("wrong key class: "+key.getClass().getName() 1248 +" is not "+keyClass); 1249 if (val.getClass() != valClass) 1250 throw new IOException("wrong value class: "+val.getClass().getName() 1251 +" is not "+valClass); 1252 1253 buffer.reset(); 1254 1255 // Append the 'key' 1256 keySerializer.serialize(key); 1257 int keyLength = buffer.getLength(); 1258 if (keyLength < 0) 1259 throw new IOException("negative length keys not allowed: " + key); 1260 1261 // Append the 'value' 1262 if (compress == CompressionType.RECORD) { 1263 deflateFilter.resetState(); 1264 compressedValSerializer.serialize(val); 1265 deflateOut.flush(); 1266 deflateFilter.finish(); 1267 } else { 1268 uncompressedValSerializer.serialize(val); 1269 } 1270 1271 // Write the record out 1272 checkAndWriteSync(); // sync 1273 out.writeInt(buffer.getLength()); // total record length 1274 out.writeInt(keyLength); // key portion length 1275 out.write(buffer.getData(), 0, buffer.getLength()); // data 1276 } 1277 1278 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1279 int keyLength, ValueBytes val) throws IOException { 1280 if (keyLength < 0) 1281 throw new IOException("negative length keys not allowed: " + keyLength); 1282 1283 int valLength = val.getSize(); 1284 1285 checkAndWriteSync(); 1286 1287 out.writeInt(keyLength+valLength); // total record length 1288 out.writeInt(keyLength); // key portion length 1289 out.write(keyData, keyOffset, keyLength); // key 1290 val.writeUncompressedBytes(out); // value 1291 } 1292 1293 /** Returns the current length of the output file. 1294 * 1295 * <p>This always returns a synchronized position. In other words, 1296 * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position 1297 * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called. However 1298 * the key may be earlier in the file than key last written when this 1299 * method was called (e.g., with block-compression, it may be the first key 1300 * in the block that was being written when this method was called). 1301 */ 1302 public synchronized long getLength() throws IOException { 1303 return out.getPos(); 1304 } 1305 1306 } // class Writer 1307 1308 /** Write key/compressed-value pairs to a sequence-format file. */ 1309 static class RecordCompressWriter extends Writer { 1310 1311 RecordCompressWriter(Configuration conf, 1312 Option... options) throws IOException { 1313 super(conf, options); 1314 } 1315 1316 /** Append a key/value pair. */ 1317 @SuppressWarnings("unchecked") 1318 public synchronized void append(Object key, Object val) 1319 throws IOException { 1320 if (key.getClass() != keyClass) 1321 throw new IOException("wrong key class: "+key.getClass().getName() 1322 +" is not "+keyClass); 1323 if (val.getClass() != valClass) 1324 throw new IOException("wrong value class: "+val.getClass().getName() 1325 +" is not "+valClass); 1326 1327 buffer.reset(); 1328 1329 // Append the 'key' 1330 keySerializer.serialize(key); 1331 int keyLength = buffer.getLength(); 1332 if (keyLength < 0) 1333 throw new IOException("negative length keys not allowed: " + key); 1334 1335 // Compress 'value' and append it 1336 deflateFilter.resetState(); 1337 compressedValSerializer.serialize(val); 1338 deflateOut.flush(); 1339 deflateFilter.finish(); 1340 1341 // Write the record out 1342 checkAndWriteSync(); // sync 1343 out.writeInt(buffer.getLength()); // total record length 1344 out.writeInt(keyLength); // key portion length 1345 out.write(buffer.getData(), 0, buffer.getLength()); // data 1346 } 1347 1348 /** Append a key/value pair. */ 1349 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1350 int keyLength, ValueBytes val) throws IOException { 1351 1352 if (keyLength < 0) 1353 throw new IOException("negative length keys not allowed: " + keyLength); 1354 1355 int valLength = val.getSize(); 1356 1357 checkAndWriteSync(); // sync 1358 out.writeInt(keyLength+valLength); // total record length 1359 out.writeInt(keyLength); // key portion length 1360 out.write(keyData, keyOffset, keyLength); // 'key' data 1361 val.writeCompressedBytes(out); // 'value' data 1362 } 1363 1364 } // RecordCompressionWriter 1365 1366 /** Write compressed key/value blocks to a sequence-format file. */ 1367 static class BlockCompressWriter extends Writer { 1368 1369 private int noBufferedRecords = 0; 1370 1371 private DataOutputBuffer keyLenBuffer = new DataOutputBuffer(); 1372 private DataOutputBuffer keyBuffer = new DataOutputBuffer(); 1373 1374 private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); 1375 private DataOutputBuffer valBuffer = new DataOutputBuffer(); 1376 1377 private final int compressionBlockSize; 1378 1379 BlockCompressWriter(Configuration conf, 1380 Option... options) throws IOException { 1381 super(conf, options); 1382 compressionBlockSize = 1383 conf.getInt("io.seqfile.compress.blocksize", 1000000); 1384 keySerializer.close(); 1385 keySerializer.open(keyBuffer); 1386 uncompressedValSerializer.close(); 1387 uncompressedValSerializer.open(valBuffer); 1388 } 1389 1390 /** Workhorse to check and write out compressed data/lengths */ 1391 private synchronized 1392 void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 1393 throws IOException { 1394 deflateFilter.resetState(); 1395 buffer.reset(); 1396 deflateOut.write(uncompressedDataBuffer.getData(), 0, 1397 uncompressedDataBuffer.getLength()); 1398 deflateOut.flush(); 1399 deflateFilter.finish(); 1400 1401 WritableUtils.writeVInt(out, buffer.getLength()); 1402 out.write(buffer.getData(), 0, buffer.getLength()); 1403 } 1404 1405 /** Compress and flush contents to dfs */ 1406 public synchronized void sync() throws IOException { 1407 if (noBufferedRecords > 0) { 1408 super.sync(); 1409 1410 // No. of records 1411 WritableUtils.writeVInt(out, noBufferedRecords); 1412 1413 // Write 'keys' and lengths 1414 writeBuffer(keyLenBuffer); 1415 writeBuffer(keyBuffer); 1416 1417 // Write 'values' and lengths 1418 writeBuffer(valLenBuffer); 1419 writeBuffer(valBuffer); 1420 1421 // Flush the file-stream 1422 out.flush(); 1423 1424 // Reset internal states 1425 keyLenBuffer.reset(); 1426 keyBuffer.reset(); 1427 valLenBuffer.reset(); 1428 valBuffer.reset(); 1429 noBufferedRecords = 0; 1430 } 1431 1432 } 1433 1434 /** Close the file. */ 1435 public synchronized void close() throws IOException { 1436 if (out != null) { 1437 sync(); 1438 } 1439 super.close(); 1440 } 1441 1442 /** Append a key/value pair. */ 1443 @SuppressWarnings("unchecked") 1444 public synchronized void append(Object key, Object val) 1445 throws IOException { 1446 if (key.getClass() != keyClass) 1447 throw new IOException("wrong key class: "+key+" is not "+keyClass); 1448 if (val.getClass() != valClass) 1449 throw new IOException("wrong value class: "+val+" is not "+valClass); 1450 1451 // Save key/value into respective buffers 1452 int oldKeyLength = keyBuffer.getLength(); 1453 keySerializer.serialize(key); 1454 int keyLength = keyBuffer.getLength() - oldKeyLength; 1455 if (keyLength < 0) 1456 throw new IOException("negative length keys not allowed: " + key); 1457 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1458 1459 int oldValLength = valBuffer.getLength(); 1460 uncompressedValSerializer.serialize(val); 1461 int valLength = valBuffer.getLength() - oldValLength; 1462 WritableUtils.writeVInt(valLenBuffer, valLength); 1463 1464 // Added another key/value pair 1465 ++noBufferedRecords; 1466 1467 // Compress and flush? 1468 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1469 if (currentBlockSize >= compressionBlockSize) { 1470 sync(); 1471 } 1472 } 1473 1474 /** Append a key/value pair. */ 1475 public synchronized void appendRaw(byte[] keyData, int keyOffset, 1476 int keyLength, ValueBytes val) throws IOException { 1477 1478 if (keyLength < 0) 1479 throw new IOException("negative length keys not allowed"); 1480 1481 int valLength = val.getSize(); 1482 1483 // Save key/value data in relevant buffers 1484 WritableUtils.writeVInt(keyLenBuffer, keyLength); 1485 keyBuffer.write(keyData, keyOffset, keyLength); 1486 WritableUtils.writeVInt(valLenBuffer, valLength); 1487 val.writeUncompressedBytes(valBuffer); 1488 1489 // Added another key/value pair 1490 ++noBufferedRecords; 1491 1492 // Compress and flush? 1493 int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 1494 if (currentBlockSize >= compressionBlockSize) { 1495 sync(); 1496 } 1497 } 1498 1499 } // BlockCompressionWriter 1500 1501 /** Get the configured buffer size */ 1502 private static int getBufferSize(Configuration conf) { 1503 return conf.getInt("io.file.buffer.size", 4096); 1504 } 1505 1506 /** Reads key/value pairs from a sequence-format file. */ 1507 public static class Reader implements java.io.Closeable { 1508 private String filename; 1509 private FSDataInputStream in; 1510 private DataOutputBuffer outBuf = new DataOutputBuffer(); 1511 1512 private byte version; 1513 1514 private String keyClassName; 1515 private String valClassName; 1516 private Class keyClass; 1517 private Class valClass; 1518 1519 private CompressionCodec codec = null; 1520 private Metadata metadata = null; 1521 1522 private byte[] sync = new byte[SYNC_HASH_SIZE]; 1523 private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; 1524 private boolean syncSeen; 1525 1526 private long headerEnd; 1527 private long end; 1528 private int keyLength; 1529 private int recordLength; 1530 1531 private boolean decompress; 1532 private boolean blockCompressed; 1533 1534 private Configuration conf; 1535 1536 private int noBufferedRecords = 0; 1537 private boolean lazyDecompress = true; 1538 private boolean valuesDecompressed = true; 1539 1540 private int noBufferedKeys = 0; 1541 private int noBufferedValues = 0; 1542 1543 private DataInputBuffer keyLenBuffer = null; 1544 private CompressionInputStream keyLenInFilter = null; 1545 private DataInputStream keyLenIn = null; 1546 private Decompressor keyLenDecompressor = null; 1547 private DataInputBuffer keyBuffer = null; 1548 private CompressionInputStream keyInFilter = null; 1549 private DataInputStream keyIn = null; 1550 private Decompressor keyDecompressor = null; 1551 1552 private DataInputBuffer valLenBuffer = null; 1553 private CompressionInputStream valLenInFilter = null; 1554 private DataInputStream valLenIn = null; 1555 private Decompressor valLenDecompressor = null; 1556 private DataInputBuffer valBuffer = null; 1557 private CompressionInputStream valInFilter = null; 1558 private DataInputStream valIn = null; 1559 private Decompressor valDecompressor = null; 1560 1561 private Deserializer keyDeserializer; 1562 private Deserializer valDeserializer; 1563 1564 /** 1565 * A tag interface for all of the Reader options 1566 */ 1567 public static interface Option {} 1568 1569 /** 1570 * Create an option to specify the path name of the sequence file. 1571 * @param value the path to read 1572 * @return a new option 1573 */ 1574 public static Option file(Path value) { 1575 return new FileOption(value); 1576 } 1577 1578 /** 1579 * Create an option to specify the stream with the sequence file. 1580 * @param value the stream to read. 1581 * @return a new option 1582 */ 1583 public static Option stream(FSDataInputStream value) { 1584 return new InputStreamOption(value); 1585 } 1586 1587 /** 1588 * Create an option to specify the starting byte to read. 1589 * @param value the number of bytes to skip over 1590 * @return a new option 1591 */ 1592 public static Option start(long value) { 1593 return new StartOption(value); 1594 } 1595 1596 /** 1597 * Create an option to specify the number of bytes to read. 1598 * @param value the number of bytes to read 1599 * @return a new option 1600 */ 1601 public static Option length(long value) { 1602 return new LengthOption(value); 1603 } 1604 1605 /** 1606 * Create an option with the buffer size for reading the given pathname. 1607 * @param value the number of bytes to buffer 1608 * @return a new option 1609 */ 1610 public static Option bufferSize(int value) { 1611 return new BufferSizeOption(value); 1612 } 1613 1614 private static class FileOption extends Options.PathOption 1615 implements Option { 1616 private FileOption(Path value) { 1617 super(value); 1618 } 1619 } 1620 1621 private static class InputStreamOption 1622 extends Options.FSDataInputStreamOption 1623 implements Option { 1624 private InputStreamOption(FSDataInputStream value) { 1625 super(value); 1626 } 1627 } 1628 1629 private static class StartOption extends Options.LongOption 1630 implements Option { 1631 private StartOption(long value) { 1632 super(value); 1633 } 1634 } 1635 1636 private static class LengthOption extends Options.LongOption 1637 implements Option { 1638 private LengthOption(long value) { 1639 super(value); 1640 } 1641 } 1642 1643 private static class BufferSizeOption extends Options.IntegerOption 1644 implements Option { 1645 private BufferSizeOption(int value) { 1646 super(value); 1647 } 1648 } 1649 1650 // only used directly 1651 private static class OnlyHeaderOption extends Options.BooleanOption 1652 implements Option { 1653 private OnlyHeaderOption() { 1654 super(true); 1655 } 1656 } 1657 1658 public Reader(Configuration conf, Option... opts) throws IOException { 1659 // Look up the options, these are null if not set 1660 FileOption fileOpt = Options.getOption(FileOption.class, opts); 1661 InputStreamOption streamOpt = 1662 Options.getOption(InputStreamOption.class, opts); 1663 StartOption startOpt = Options.getOption(StartOption.class, opts); 1664 LengthOption lenOpt = Options.getOption(LengthOption.class, opts); 1665 BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); 1666 OnlyHeaderOption headerOnly = 1667 Options.getOption(OnlyHeaderOption.class, opts); 1668 // check for consistency 1669 if ((fileOpt == null) == (streamOpt == null)) { 1670 throw new 1671 IllegalArgumentException("File or stream option must be specified"); 1672 } 1673 if (fileOpt == null && bufOpt != null) { 1674 throw new IllegalArgumentException("buffer size can only be set when" + 1675 " a file is specified."); 1676 } 1677 // figure out the real values 1678 Path filename = null; 1679 FSDataInputStream file; 1680 final long len; 1681 if (fileOpt != null) { 1682 filename = fileOpt.getValue(); 1683 FileSystem fs = filename.getFileSystem(conf); 1684 int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); 1685 len = null == lenOpt 1686 ? fs.getFileStatus(filename).getLen() 1687 : lenOpt.getValue(); 1688 file = openFile(fs, filename, bufSize, len); 1689 } else { 1690 len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); 1691 file = streamOpt.getValue(); 1692 } 1693 long start = startOpt == null ? 0 : startOpt.getValue(); 1694 // really set up 1695 initialize(filename, file, start, len, conf, headerOnly != null); 1696 } 1697 1698 /** 1699 * Construct a reader by opening a file from the given file system. 1700 * @param fs The file system used to open the file. 1701 * @param file The file being read. 1702 * @param conf Configuration 1703 * @throws IOException 1704 * @deprecated Use Reader(Configuration, Option...) instead. 1705 */ 1706 @Deprecated 1707 public Reader(FileSystem fs, Path file, 1708 Configuration conf) throws IOException { 1709 this(conf, file(file.makeQualified(fs))); 1710 } 1711 1712 /** 1713 * Construct a reader by the given input stream. 1714 * @param in An input stream. 1715 * @param buffersize unused 1716 * @param start The starting position. 1717 * @param length The length being read. 1718 * @param conf Configuration 1719 * @throws IOException 1720 * @deprecated Use Reader(Configuration, Reader.Option...) instead. 1721 */ 1722 @Deprecated 1723 public Reader(FSDataInputStream in, int buffersize, 1724 long start, long length, Configuration conf) throws IOException { 1725 this(conf, stream(in), start(start), length(length)); 1726 } 1727 1728 /** Common work of the constructors. */ 1729 private void initialize(Path filename, FSDataInputStream in, 1730 long start, long length, Configuration conf, 1731 boolean tempReader) throws IOException { 1732 if (in == null) { 1733 throw new IllegalArgumentException("in == null"); 1734 } 1735 this.filename = filename == null ? "<unknown>" : filename.toString(); 1736 this.in = in; 1737 this.conf = conf; 1738 boolean succeeded = false; 1739 try { 1740 seek(start); 1741 this.end = this.in.getPos() + length; 1742 // if it wrapped around, use the max 1743 if (end < length) { 1744 end = Long.MAX_VALUE; 1745 } 1746 init(tempReader); 1747 succeeded = true; 1748 } finally { 1749 if (!succeeded) { 1750 IOUtils.cleanup(LOG, this.in); 1751 } 1752 } 1753 } 1754 1755 /** 1756 * Override this method to specialize the type of 1757 * {@link FSDataInputStream} returned. 1758 * @param fs The file system used to open the file. 1759 * @param file The file being read. 1760 * @param bufferSize The buffer size used to read the file. 1761 * @param length The length being read if it is >= 0. Otherwise, 1762 * the length is not available. 1763 * @return The opened stream. 1764 * @throws IOException 1765 */ 1766 protected FSDataInputStream openFile(FileSystem fs, Path file, 1767 int bufferSize, long length) throws IOException { 1768 return fs.open(file, bufferSize); 1769 } 1770 1771 /** 1772 * Initialize the {@link Reader} 1773 * @param tmpReader <code>true</code> if we are constructing a temporary 1774 * reader {@link SequenceFile.Sorter.cloneFileAttributes}, 1775 * and hence do not initialize every component; 1776 * <code>false</code> otherwise. 1777 * @throws IOException 1778 */ 1779 private void init(boolean tempReader) throws IOException { 1780 byte[] versionBlock = new byte[VERSION.length]; 1781 in.readFully(versionBlock); 1782 1783 if ((versionBlock[0] != VERSION[0]) || 1784 (versionBlock[1] != VERSION[1]) || 1785 (versionBlock[2] != VERSION[2])) 1786 throw new IOException(this + " not a SequenceFile"); 1787 1788 // Set 'version' 1789 version = versionBlock[3]; 1790 if (version > VERSION[3]) 1791 throw new VersionMismatchException(VERSION[3], version); 1792 1793 if (version < BLOCK_COMPRESS_VERSION) { 1794 UTF8 className = new UTF8(); 1795 1796 className.readFields(in); 1797 keyClassName = className.toString(); // key class name 1798 1799 className.readFields(in); 1800 valClassName = className.toString(); // val class name 1801 } else { 1802 keyClassName = Text.readString(in); 1803 valClassName = Text.readString(in); 1804 } 1805 1806 if (version > 2) { // if version > 2 1807 this.decompress = in.readBoolean(); // is compressed? 1808 } else { 1809 decompress = false; 1810 } 1811 1812 if (version >= BLOCK_COMPRESS_VERSION) { // if version >= 4 1813 this.blockCompressed = in.readBoolean(); // is block-compressed? 1814 } else { 1815 blockCompressed = false; 1816 } 1817 1818 // if version >= 5 1819 // setup the compression codec 1820 if (decompress) { 1821 if (version >= CUSTOM_COMPRESS_VERSION) { 1822 String codecClassname = Text.readString(in); 1823 try { 1824 Class<? extends CompressionCodec> codecClass 1825 = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class); 1826 this.codec = ReflectionUtils.newInstance(codecClass, conf); 1827 } catch (ClassNotFoundException cnfe) { 1828 throw new IllegalArgumentException("Unknown codec: " + 1829 codecClassname, cnfe); 1830 } 1831 } else { 1832 codec = new DefaultCodec(); 1833 ((Configurable)codec).setConf(conf); 1834 } 1835 } 1836 1837 this.metadata = new Metadata(); 1838 if (version >= VERSION_WITH_METADATA) { // if version >= 6 1839 this.metadata.readFields(in); 1840 } 1841 1842 if (version > 1) { // if version > 1 1843 in.readFully(sync); // read sync bytes 1844 headerEnd = in.getPos(); // record end of header 1845 } 1846 1847 // Initialize... *not* if this we are constructing a temporary Reader 1848 if (!tempReader) { 1849 valBuffer = new DataInputBuffer(); 1850 if (decompress) { 1851 valDecompressor = CodecPool.getDecompressor(codec); 1852 valInFilter = codec.createInputStream(valBuffer, valDecompressor); 1853 valIn = new DataInputStream(valInFilter); 1854 } else { 1855 valIn = valBuffer; 1856 } 1857 1858 if (blockCompressed) { 1859 keyLenBuffer = new DataInputBuffer(); 1860 keyBuffer = new DataInputBuffer(); 1861 valLenBuffer = new DataInputBuffer(); 1862 1863 keyLenDecompressor = CodecPool.getDecompressor(codec); 1864 keyLenInFilter = codec.createInputStream(keyLenBuffer, 1865 keyLenDecompressor); 1866 keyLenIn = new DataInputStream(keyLenInFilter); 1867 1868 keyDecompressor = CodecPool.getDecompressor(codec); 1869 keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor); 1870 keyIn = new DataInputStream(keyInFilter); 1871 1872 valLenDecompressor = CodecPool.getDecompressor(codec); 1873 valLenInFilter = codec.createInputStream(valLenBuffer, 1874 valLenDecompressor); 1875 valLenIn = new DataInputStream(valLenInFilter); 1876 } 1877 1878 SerializationFactory serializationFactory = 1879 new SerializationFactory(conf); 1880 this.keyDeserializer = 1881 getDeserializer(serializationFactory, getKeyClass()); 1882 if (!blockCompressed) { 1883 this.keyDeserializer.open(valBuffer); 1884 } else { 1885 this.keyDeserializer.open(keyIn); 1886 } 1887 this.valDeserializer = 1888 getDeserializer(serializationFactory, getValueClass()); 1889 this.valDeserializer.open(valIn); 1890 } 1891 } 1892 1893 @SuppressWarnings("unchecked") 1894 private Deserializer getDeserializer(SerializationFactory sf, Class c) { 1895 return sf.getDeserializer(c); 1896 } 1897 1898 /** Close the file. */ 1899 public synchronized void close() throws IOException { 1900 // Return the decompressors to the pool 1901 CodecPool.returnDecompressor(keyLenDecompressor); 1902 CodecPool.returnDecompressor(keyDecompressor); 1903 CodecPool.returnDecompressor(valLenDecompressor); 1904 CodecPool.returnDecompressor(valDecompressor); 1905 keyLenDecompressor = keyDecompressor = null; 1906 valLenDecompressor = valDecompressor = null; 1907 1908 if (keyDeserializer != null) { 1909 keyDeserializer.close(); 1910 } 1911 if (valDeserializer != null) { 1912 valDeserializer.close(); 1913 } 1914 1915 // Close the input-stream 1916 in.close(); 1917 } 1918 1919 /** Returns the name of the key class. */ 1920 public String getKeyClassName() { 1921 return keyClassName; 1922 } 1923 1924 /** Returns the class of keys in this file. */ 1925 public synchronized Class<?> getKeyClass() { 1926 if (null == keyClass) { 1927 try { 1928 keyClass = WritableName.getClass(getKeyClassName(), conf); 1929 } catch (IOException e) { 1930 throw new RuntimeException(e); 1931 } 1932 } 1933 return keyClass; 1934 } 1935 1936 /** Returns the name of the value class. */ 1937 public String getValueClassName() { 1938 return valClassName; 1939 } 1940 1941 /** Returns the class of values in this file. */ 1942 public synchronized Class<?> getValueClass() { 1943 if (null == valClass) { 1944 try { 1945 valClass = WritableName.getClass(getValueClassName(), conf); 1946 } catch (IOException e) { 1947 throw new RuntimeException(e); 1948 } 1949 } 1950 return valClass; 1951 } 1952 1953 /** Returns true if values are compressed. */ 1954 public boolean isCompressed() { return decompress; } 1955 1956 /** Returns true if records are block-compressed. */ 1957 public boolean isBlockCompressed() { return blockCompressed; } 1958 1959 /** Returns the compression codec of data in this file. */ 1960 public CompressionCodec getCompressionCodec() { return codec; } 1961 1962 /** 1963 * Get the compression type for this file. 1964 * @return the compression type 1965 */ 1966 public CompressionType getCompressionType() { 1967 if (decompress) { 1968 return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; 1969 } else { 1970 return CompressionType.NONE; 1971 } 1972 } 1973 1974 /** Returns the metadata object of the file */ 1975 public Metadata getMetadata() { 1976 return this.metadata; 1977 } 1978 1979 /** Returns the configuration used for this file. */ 1980 Configuration getConf() { return conf; } 1981 1982 /** Read a compressed buffer */ 1983 private synchronized void readBuffer(DataInputBuffer buffer, 1984 CompressionInputStream filter) throws IOException { 1985 // Read data into a temporary buffer 1986 DataOutputBuffer dataBuffer = new DataOutputBuffer(); 1987 1988 try { 1989 int dataBufferLength = WritableUtils.readVInt(in); 1990 dataBuffer.write(in, dataBufferLength); 1991 1992 // Set up 'buffer' connected to the input-stream 1993 buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength()); 1994 } finally { 1995 dataBuffer.close(); 1996 } 1997 1998 // Reset the codec 1999 filter.resetState(); 2000 } 2001 2002 /** Read the next 'compressed' block */ 2003 private synchronized void readBlock() throws IOException { 2004 // Check if we need to throw away a whole block of 2005 // 'values' due to 'lazy decompression' 2006 if (lazyDecompress && !valuesDecompressed) { 2007 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2008 in.seek(WritableUtils.readVInt(in)+in.getPos()); 2009 } 2010 2011 // Reset internal states 2012 noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0; 2013 valuesDecompressed = false; 2014 2015 //Process sync 2016 if (sync != null) { 2017 in.readInt(); 2018 in.readFully(syncCheck); // read syncCheck 2019 if (!Arrays.equals(sync, syncCheck)) // check it 2020 throw new IOException("File is corrupt!"); 2021 } 2022 syncSeen = true; 2023 2024 // Read number of records in this block 2025 noBufferedRecords = WritableUtils.readVInt(in); 2026 2027 // Read key lengths and keys 2028 readBuffer(keyLenBuffer, keyLenInFilter); 2029 readBuffer(keyBuffer, keyInFilter); 2030 noBufferedKeys = noBufferedRecords; 2031 2032 // Read value lengths and values 2033 if (!lazyDecompress) { 2034 readBuffer(valLenBuffer, valLenInFilter); 2035 readBuffer(valBuffer, valInFilter); 2036 noBufferedValues = noBufferedRecords; 2037 valuesDecompressed = true; 2038 } 2039 } 2040 2041 /** 2042 * Position valLenIn/valIn to the 'value' 2043 * corresponding to the 'current' key 2044 */ 2045 private synchronized void seekToCurrentValue() throws IOException { 2046 if (!blockCompressed) { 2047 if (decompress) { 2048 valInFilter.resetState(); 2049 } 2050 valBuffer.reset(); 2051 } else { 2052 // Check if this is the first value in the 'block' to be read 2053 if (lazyDecompress && !valuesDecompressed) { 2054 // Read the value lengths and values 2055 readBuffer(valLenBuffer, valLenInFilter); 2056 readBuffer(valBuffer, valInFilter); 2057 noBufferedValues = noBufferedRecords; 2058 valuesDecompressed = true; 2059 } 2060 2061 // Calculate the no. of bytes to skip 2062 // Note: 'current' key has already been read! 2063 int skipValBytes = 0; 2064 int currentKey = noBufferedKeys + 1; 2065 for (int i=noBufferedValues; i > currentKey; --i) { 2066 skipValBytes += WritableUtils.readVInt(valLenIn); 2067 --noBufferedValues; 2068 } 2069 2070 // Skip to the 'val' corresponding to 'current' key 2071 if (skipValBytes > 0) { 2072 if (valIn.skipBytes(skipValBytes) != skipValBytes) { 2073 throw new IOException("Failed to seek to " + currentKey + 2074 "(th) value!"); 2075 } 2076 } 2077 } 2078 } 2079 2080 /** 2081 * Get the 'value' corresponding to the last read 'key'. 2082 * @param val : The 'value' to be read. 2083 * @throws IOException 2084 */ 2085 public synchronized void getCurrentValue(Writable val) 2086 throws IOException { 2087 if (val instanceof Configurable) { 2088 ((Configurable) val).setConf(this.conf); 2089 } 2090 2091 // Position stream to 'current' value 2092 seekToCurrentValue(); 2093 2094 if (!blockCompressed) { 2095 val.readFields(valIn); 2096 2097 if (valIn.read() > 0) { 2098 LOG.info("available bytes: " + valIn.available()); 2099 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2100 + " bytes, should read " + 2101 (valBuffer.getLength()-keyLength)); 2102 } 2103 } else { 2104 // Get the value 2105 int valLength = WritableUtils.readVInt(valLenIn); 2106 val.readFields(valIn); 2107 2108 // Read another compressed 'value' 2109 --noBufferedValues; 2110 2111 // Sanity check 2112 if ((valLength < 0) && LOG.isDebugEnabled()) { 2113 LOG.debug(val + " is a zero-length value"); 2114 } 2115 } 2116 2117 } 2118 2119 /** 2120 * Get the 'value' corresponding to the last read 'key'. 2121 * @param val : The 'value' to be read. 2122 * @throws IOException 2123 */ 2124 public synchronized Object getCurrentValue(Object val) 2125 throws IOException { 2126 if (val instanceof Configurable) { 2127 ((Configurable) val).setConf(this.conf); 2128 } 2129 2130 // Position stream to 'current' value 2131 seekToCurrentValue(); 2132 2133 if (!blockCompressed) { 2134 val = deserializeValue(val); 2135 2136 if (valIn.read() > 0) { 2137 LOG.info("available bytes: " + valIn.available()); 2138 throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength) 2139 + " bytes, should read " + 2140 (valBuffer.getLength()-keyLength)); 2141 } 2142 } else { 2143 // Get the value 2144 int valLength = WritableUtils.readVInt(valLenIn); 2145 val = deserializeValue(val); 2146 2147 // Read another compressed 'value' 2148 --noBufferedValues; 2149 2150 // Sanity check 2151 if ((valLength < 0) && LOG.isDebugEnabled()) { 2152 LOG.debug(val + " is a zero-length value"); 2153 } 2154 } 2155 return val; 2156 2157 } 2158 2159 @SuppressWarnings("unchecked") 2160 private Object deserializeValue(Object val) throws IOException { 2161 return valDeserializer.deserialize(val); 2162 } 2163 2164 /** Read the next key in the file into <code>key</code>, skipping its 2165 * value. True if another entry exists, and false at end of file. */ 2166 public synchronized boolean next(Writable key) throws IOException { 2167 if (key.getClass() != getKeyClass()) 2168 throw new IOException("wrong key class: "+key.getClass().getName() 2169 +" is not "+keyClass); 2170 2171 if (!blockCompressed) { 2172 outBuf.reset(); 2173 2174 keyLength = next(outBuf); 2175 if (keyLength < 0) 2176 return false; 2177 2178 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2179 2180 key.readFields(valBuffer); 2181 valBuffer.mark(0); 2182 if (valBuffer.getPosition() != keyLength) 2183 throw new IOException(key + " read " + valBuffer.getPosition() 2184 + " bytes, should read " + keyLength); 2185 } else { 2186 //Reset syncSeen 2187 syncSeen = false; 2188 2189 if (noBufferedKeys == 0) { 2190 try { 2191 readBlock(); 2192 } catch (EOFException eof) { 2193 return false; 2194 } 2195 } 2196 2197 int keyLength = WritableUtils.readVInt(keyLenIn); 2198 2199 // Sanity check 2200 if (keyLength < 0) { 2201 return false; 2202 } 2203 2204 //Read another compressed 'key' 2205 key.readFields(keyIn); 2206 --noBufferedKeys; 2207 } 2208 2209 return true; 2210 } 2211 2212 /** Read the next key/value pair in the file into <code>key</code> and 2213 * <code>val</code>. Returns true if such a pair exists and false when at 2214 * end of file */ 2215 public synchronized boolean next(Writable key, Writable val) 2216 throws IOException { 2217 if (val.getClass() != getValueClass()) 2218 throw new IOException("wrong value class: "+val+" is not "+valClass); 2219 2220 boolean more = next(key); 2221 2222 if (more) { 2223 getCurrentValue(val); 2224 } 2225 2226 return more; 2227 } 2228 2229 /** 2230 * Read and return the next record length, potentially skipping over 2231 * a sync block. 2232 * @return the length of the next record or -1 if there is no next record 2233 * @throws IOException 2234 */ 2235 private synchronized int readRecordLength() throws IOException { 2236 if (in.getPos() >= end) { 2237 return -1; 2238 } 2239 int length = in.readInt(); 2240 if (version > 1 && sync != null && 2241 length == SYNC_ESCAPE) { // process a sync entry 2242 in.readFully(syncCheck); // read syncCheck 2243 if (!Arrays.equals(sync, syncCheck)) // check it 2244 throw new IOException("File is corrupt!"); 2245 syncSeen = true; 2246 if (in.getPos() >= end) { 2247 return -1; 2248 } 2249 length = in.readInt(); // re-read length 2250 } else { 2251 syncSeen = false; 2252 } 2253 2254 return length; 2255 } 2256 2257 /** Read the next key/value pair in the file into <code>buffer</code>. 2258 * Returns the length of the key read, or -1 if at end of file. The length 2259 * of the value may be computed by calling buffer.getLength() before and 2260 * after calls to this method. */ 2261 /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ 2262 @Deprecated 2263 synchronized int next(DataOutputBuffer buffer) throws IOException { 2264 // Unsupported for block-compressed sequence files 2265 if (blockCompressed) { 2266 throw new IOException("Unsupported call for block-compressed" + 2267 " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)"); 2268 } 2269 try { 2270 int length = readRecordLength(); 2271 if (length == -1) { 2272 return -1; 2273 } 2274 int keyLength = in.readInt(); 2275 buffer.write(in, length); 2276 return keyLength; 2277 } catch (ChecksumException e) { // checksum failure 2278 handleChecksumException(e); 2279 return next(buffer); 2280 } 2281 } 2282 2283 public ValueBytes createValueBytes() { 2284 ValueBytes val = null; 2285 if (!decompress || blockCompressed) { 2286 val = new UncompressedBytes(); 2287 } else { 2288 val = new CompressedBytes(codec); 2289 } 2290 return val; 2291 } 2292 2293 /** 2294 * Read 'raw' records. 2295 * @param key - The buffer into which the key is read 2296 * @param val - The 'raw' value 2297 * @return Returns the total record length or -1 for end of file 2298 * @throws IOException 2299 */ 2300 public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 2301 throws IOException { 2302 if (!blockCompressed) { 2303 int length = readRecordLength(); 2304 if (length == -1) { 2305 return -1; 2306 } 2307 int keyLength = in.readInt(); 2308 int valLength = length - keyLength; 2309 key.write(in, keyLength); 2310 if (decompress) { 2311 CompressedBytes value = (CompressedBytes)val; 2312 value.reset(in, valLength); 2313 } else { 2314 UncompressedBytes value = (UncompressedBytes)val; 2315 value.reset(in, valLength); 2316 } 2317 2318 return length; 2319 } else { 2320 //Reset syncSeen 2321 syncSeen = false; 2322 2323 // Read 'key' 2324 if (noBufferedKeys == 0) { 2325 if (in.getPos() >= end) 2326 return -1; 2327 2328 try { 2329 readBlock(); 2330 } catch (EOFException eof) { 2331 return -1; 2332 } 2333 } 2334 int keyLength = WritableUtils.readVInt(keyLenIn); 2335 if (keyLength < 0) { 2336 throw new IOException("zero length key found!"); 2337 } 2338 key.write(keyIn, keyLength); 2339 --noBufferedKeys; 2340 2341 // Read raw 'value' 2342 seekToCurrentValue(); 2343 int valLength = WritableUtils.readVInt(valLenIn); 2344 UncompressedBytes rawValue = (UncompressedBytes)val; 2345 rawValue.reset(valIn, valLength); 2346 --noBufferedValues; 2347 2348 return (keyLength+valLength); 2349 } 2350 2351 } 2352 2353 /** 2354 * Read 'raw' keys. 2355 * @param key - The buffer into which the key is read 2356 * @return Returns the key length or -1 for end of file 2357 * @throws IOException 2358 */ 2359 public synchronized int nextRawKey(DataOutputBuffer key) 2360 throws IOException { 2361 if (!blockCompressed) { 2362 recordLength = readRecordLength(); 2363 if (recordLength == -1) { 2364 return -1; 2365 } 2366 keyLength = in.readInt(); 2367 key.write(in, keyLength); 2368 return keyLength; 2369 } else { 2370 //Reset syncSeen 2371 syncSeen = false; 2372 2373 // Read 'key' 2374 if (noBufferedKeys == 0) { 2375 if (in.getPos() >= end) 2376 return -1; 2377 2378 try { 2379 readBlock(); 2380 } catch (EOFException eof) { 2381 return -1; 2382 } 2383 } 2384 int keyLength = WritableUtils.readVInt(keyLenIn); 2385 if (keyLength < 0) { 2386 throw new IOException("zero length key found!"); 2387 } 2388 key.write(keyIn, keyLength); 2389 --noBufferedKeys; 2390 2391 return keyLength; 2392 } 2393 2394 } 2395 2396 /** Read the next key in the file, skipping its 2397 * value. Return null at end of file. */ 2398 public synchronized Object next(Object key) throws IOException { 2399 if (key != null && key.getClass() != getKeyClass()) { 2400 throw new IOException("wrong key class: "+key.getClass().getName() 2401 +" is not "+keyClass); 2402 } 2403 2404 if (!blockCompressed) { 2405 outBuf.reset(); 2406 2407 keyLength = next(outBuf); 2408 if (keyLength < 0) 2409 return null; 2410 2411 valBuffer.reset(outBuf.getData(), outBuf.getLength()); 2412 2413 key = deserializeKey(key); 2414 valBuffer.mark(0); 2415 if (valBuffer.getPosition() != keyLength) 2416 throw new IOException(key + " read " + valBuffer.getPosition() 2417 + " bytes, should read " + keyLength); 2418 } else { 2419 //Reset syncSeen 2420 syncSeen = false; 2421 2422 if (noBufferedKeys == 0) { 2423 try { 2424 readBlock(); 2425 } catch (EOFException eof) { 2426 return null; 2427 } 2428 } 2429 2430 int keyLength = WritableUtils.readVInt(keyLenIn); 2431 2432 // Sanity check 2433 if (keyLength < 0) { 2434 return null; 2435 } 2436 2437 //Read another compressed 'key' 2438 key = deserializeKey(key); 2439 --noBufferedKeys; 2440 } 2441 2442 return key; 2443 } 2444 2445 @SuppressWarnings("unchecked") 2446 private Object deserializeKey(Object key) throws IOException { 2447 return keyDeserializer.deserialize(key); 2448 } 2449 2450 /** 2451 * Read 'raw' values. 2452 * @param val - The 'raw' value 2453 * @return Returns the value length 2454 * @throws IOException 2455 */ 2456 public synchronized int nextRawValue(ValueBytes val) 2457 throws IOException { 2458 2459 // Position stream to current value 2460 seekToCurrentValue(); 2461 2462 if (!blockCompressed) { 2463 int valLength = recordLength - keyLength; 2464 if (decompress) { 2465 CompressedBytes value = (CompressedBytes)val; 2466 value.reset(in, valLength); 2467 } else { 2468 UncompressedBytes value = (UncompressedBytes)val; 2469 value.reset(in, valLength); 2470 } 2471 2472 return valLength; 2473 } else { 2474 int valLength = WritableUtils.readVInt(valLenIn); 2475 UncompressedBytes rawValue = (UncompressedBytes)val; 2476 rawValue.reset(valIn, valLength); 2477 --noBufferedValues; 2478 return valLength; 2479 } 2480 2481 } 2482 2483 private void handleChecksumException(ChecksumException e) 2484 throws IOException { 2485 if (this.conf.getBoolean("io.skip.checksum.errors", false)) { 2486 LOG.warn("Bad checksum at "+getPosition()+". Skipping entries."); 2487 sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512)); 2488 } else { 2489 throw e; 2490 } 2491 } 2492 2493 /** disables sync. often invoked for tmp files */ 2494 synchronized void ignoreSync() { 2495 sync = null; 2496 } 2497 2498 /** Set the current byte position in the input file. 2499 * 2500 * <p>The position passed must be a position returned by {@link 2501 * SequenceFile.Writer#getLength()} when writing this file. To seek to an arbitrary 2502 * position, use {@link SequenceFile.Reader#sync(long)}. 2503 */ 2504 public synchronized void seek(long position) throws IOException { 2505 in.seek(position); 2506 if (blockCompressed) { // trigger block read 2507 noBufferedKeys = 0; 2508 valuesDecompressed = true; 2509 } 2510 } 2511 2512 /** Seek to the next sync mark past a given position.*/ 2513 public synchronized void sync(long position) throws IOException { 2514 if (position+SYNC_SIZE >= end) { 2515 seek(end); 2516 return; 2517 } 2518 2519 if (position < headerEnd) { 2520 // seek directly to first record 2521 in.seek(headerEnd); 2522 // note the sync marker "seen" in the header 2523 syncSeen = true; 2524 return; 2525 } 2526 2527 try { 2528 seek(position+4); // skip escape 2529 in.readFully(syncCheck); 2530 int syncLen = sync.length; 2531 for (int i = 0; in.getPos() < end; i++) { 2532 int j = 0; 2533 for (; j < syncLen; j++) { 2534 if (sync[j] != syncCheck[(i+j)%syncLen]) 2535 break; 2536 } 2537 if (j == syncLen) { 2538 in.seek(in.getPos() - SYNC_SIZE); // position before sync 2539 return; 2540 } 2541 syncCheck[i%syncLen] = in.readByte(); 2542 } 2543 } catch (ChecksumException e) { // checksum failure 2544 handleChecksumException(e); 2545 } 2546 } 2547 2548 /** Returns true iff the previous call to next passed a sync mark.*/ 2549 public synchronized boolean syncSeen() { return syncSeen; } 2550 2551 /** Return the current byte position in the input file. */ 2552 public synchronized long getPosition() throws IOException { 2553 return in.getPos(); 2554 } 2555 2556 /** Returns the name of the file. */ 2557 public String toString() { 2558 return filename; 2559 } 2560 2561 } 2562 2563 /** Sorts key/value pairs in a sequence-format file. 2564 * 2565 * <p>For best performance, applications should make sure that the {@link 2566 * Writable#readFields(DataInput)} implementation of their keys is 2567 * very efficient. In particular, it should avoid allocating memory. 2568 */ 2569 public static class Sorter { 2570 2571 private RawComparator comparator; 2572 2573 private MergeSort mergeSort; //the implementation of merge sort 2574 2575 private Path[] inFiles; // when merging or sorting 2576 2577 private Path outFile; 2578 2579 private int memory; // bytes 2580 private int factor; // merged per pass 2581 2582 private FileSystem fs = null; 2583 2584 private Class keyClass; 2585 private Class valClass; 2586 2587 private Configuration conf; 2588 private Metadata metadata; 2589 2590 private Progressable progressable = null; 2591 2592 /** Sort and merge files containing the named classes. */ 2593 public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, 2594 Class valClass, Configuration conf) { 2595 this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf); 2596 } 2597 2598 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2599 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2600 Class valClass, Configuration conf) { 2601 this(fs, comparator, keyClass, valClass, conf, new Metadata()); 2602 } 2603 2604 /** Sort and merge using an arbitrary {@link RawComparator}. */ 2605 public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 2606 Class valClass, Configuration conf, Metadata metadata) { 2607 this.fs = fs; 2608 this.comparator = comparator; 2609 this.keyClass = keyClass; 2610 this.valClass = valClass; 2611 this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024; 2612 this.factor = conf.getInt("io.sort.factor", 100); 2613 this.conf = conf; 2614 this.metadata = metadata; 2615 } 2616 2617 /** Set the number of streams to merge at once.*/ 2618 public void setFactor(int factor) { this.factor = factor; } 2619 2620 /** Get the number of streams to merge at once.*/ 2621 public int getFactor() { return factor; } 2622 2623 /** Set the total amount of buffer memory, in bytes.*/ 2624 public void setMemory(int memory) { this.memory = memory; } 2625 2626 /** Get the total amount of buffer memory, in bytes.*/ 2627 public int getMemory() { return memory; } 2628 2629 /** Set the progressable object in order to report progress. */ 2630 public void setProgressable(Progressable progressable) { 2631 this.progressable = progressable; 2632 } 2633 2634 /** 2635 * Perform a file sort from a set of input files into an output file. 2636 * @param inFiles the files to be sorted 2637 * @param outFile the sorted output file 2638 * @param deleteInput should the input files be deleted as they are read? 2639 */ 2640 public void sort(Path[] inFiles, Path outFile, 2641 boolean deleteInput) throws IOException { 2642 if (fs.exists(outFile)) { 2643 throw new IOException("already exists: " + outFile); 2644 } 2645 2646 this.inFiles = inFiles; 2647 this.outFile = outFile; 2648 2649 int segments = sortPass(deleteInput); 2650 if (segments > 1) { 2651 mergePass(outFile.getParent()); 2652 } 2653 } 2654 2655 /** 2656 * Perform a file sort from a set of input files and return an iterator. 2657 * @param inFiles the files to be sorted 2658 * @param tempDir the directory where temp files are created during sort 2659 * @param deleteInput should the input files be deleted as they are read? 2660 * @return iterator the RawKeyValueIterator 2661 */ 2662 public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 2663 boolean deleteInput) throws IOException { 2664 Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2"); 2665 if (fs.exists(outFile)) { 2666 throw new IOException("already exists: " + outFile); 2667 } 2668 this.inFiles = inFiles; 2669 //outFile will basically be used as prefix for temp files in the cases 2670 //where sort outputs multiple sorted segments. For the single segment 2671 //case, the outputFile itself will contain the sorted data for that 2672 //segment 2673 this.outFile = outFile; 2674 2675 int segments = sortPass(deleteInput); 2676 if (segments > 1) 2677 return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 2678 tempDir); 2679 else if (segments == 1) 2680 return merge(new Path[]{outFile}, true, tempDir); 2681 else return null; 2682 } 2683 2684 /** 2685 * The backwards compatible interface to sort. 2686 * @param inFile the input file to sort 2687 * @param outFile the sorted output file 2688 */ 2689 public void sort(Path inFile, Path outFile) throws IOException { 2690 sort(new Path[]{inFile}, outFile, false); 2691 } 2692 2693 private int sortPass(boolean deleteInput) throws IOException { 2694 if(LOG.isDebugEnabled()) { 2695 LOG.debug("running sort pass"); 2696 } 2697 SortPass sortPass = new SortPass(); // make the SortPass 2698 sortPass.setProgressable(progressable); 2699 mergeSort = new MergeSort(sortPass.new SeqFileComparator()); 2700 try { 2701 return sortPass.run(deleteInput); // run it 2702 } finally { 2703 sortPass.close(); // close it 2704 } 2705 } 2706 2707 private class SortPass { 2708 private int memoryLimit = memory/4; 2709 private int recordLimit = 1000000; 2710 2711 private DataOutputBuffer rawKeys = new DataOutputBuffer(); 2712 private byte[] rawBuffer; 2713 2714 private int[] keyOffsets = new int[1024]; 2715 private int[] pointers = new int[keyOffsets.length]; 2716 private int[] pointersCopy = new int[keyOffsets.length]; 2717 private int[] keyLengths = new int[keyOffsets.length]; 2718 private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length]; 2719 2720 private ArrayList segmentLengths = new ArrayList(); 2721 2722 private Reader in = null; 2723 private FSDataOutputStream out = null; 2724 private FSDataOutputStream indexOut = null; 2725 private Path outName; 2726 2727 private Progressable progressable = null; 2728 2729 public int run(boolean deleteInput) throws IOException { 2730 int segments = 0; 2731 int currentFile = 0; 2732 boolean atEof = (currentFile >= inFiles.length); 2733 CompressionType compressionType; 2734 CompressionCodec codec = null; 2735 segmentLengths.clear(); 2736 if (atEof) { 2737 return 0; 2738 } 2739 2740 // Initialize 2741 in = new Reader(fs, inFiles[currentFile], conf); 2742 compressionType = in.getCompressionType(); 2743 codec = in.getCompressionCodec(); 2744 2745 for (int i=0; i < rawValues.length; ++i) { 2746 rawValues[i] = null; 2747 } 2748 2749 while (!atEof) { 2750 int count = 0; 2751 int bytesProcessed = 0; 2752 rawKeys.reset(); 2753 while (!atEof && 2754 bytesProcessed < memoryLimit && count < recordLimit) { 2755 2756 // Read a record into buffer 2757 // Note: Attempt to re-use 'rawValue' as far as possible 2758 int keyOffset = rawKeys.getLength(); 2759 ValueBytes rawValue = 2760 (count == keyOffsets.length || rawValues[count] == null) ? 2761 in.createValueBytes() : 2762 rawValues[count]; 2763 int recordLength = in.nextRaw(rawKeys, rawValue); 2764 if (recordLength == -1) { 2765 in.close(); 2766 if (deleteInput) { 2767 fs.delete(inFiles[currentFile], true); 2768 } 2769 currentFile += 1; 2770 atEof = currentFile >= inFiles.length; 2771 if (!atEof) { 2772 in = new Reader(fs, inFiles[currentFile], conf); 2773 } else { 2774 in = null; 2775 } 2776 continue; 2777 } 2778 2779 int keyLength = rawKeys.getLength() - keyOffset; 2780 2781 if (count == keyOffsets.length) 2782 grow(); 2783 2784 keyOffsets[count] = keyOffset; // update pointers 2785 pointers[count] = count; 2786 keyLengths[count] = keyLength; 2787 rawValues[count] = rawValue; 2788 2789 bytesProcessed += recordLength; 2790 count++; 2791 } 2792 2793 // buffer is full -- sort & flush it 2794 if(LOG.isDebugEnabled()) { 2795 LOG.debug("flushing segment " + segments); 2796 } 2797 rawBuffer = rawKeys.getData(); 2798 sort(count); 2799 // indicate we're making progress 2800 if (progressable != null) { 2801 progressable.progress(); 2802 } 2803 flush(count, bytesProcessed, compressionType, codec, 2804 segments==0 && atEof); 2805 segments++; 2806 } 2807 return segments; 2808 } 2809 2810 public void close() throws IOException { 2811 if (in != null) { 2812 in.close(); 2813 } 2814 if (out != null) { 2815 out.close(); 2816 } 2817 if (indexOut != null) { 2818 indexOut.close(); 2819 } 2820 } 2821 2822 private void grow() { 2823 int newLength = keyOffsets.length * 3 / 2; 2824 keyOffsets = grow(keyOffsets, newLength); 2825 pointers = grow(pointers, newLength); 2826 pointersCopy = new int[newLength]; 2827 keyLengths = grow(keyLengths, newLength); 2828 rawValues = grow(rawValues, newLength); 2829 } 2830 2831 private int[] grow(int[] old, int newLength) { 2832 int[] result = new int[newLength]; 2833 System.arraycopy(old, 0, result, 0, old.length); 2834 return result; 2835 } 2836 2837 private ValueBytes[] grow(ValueBytes[] old, int newLength) { 2838 ValueBytes[] result = new ValueBytes[newLength]; 2839 System.arraycopy(old, 0, result, 0, old.length); 2840 for (int i=old.length; i < newLength; ++i) { 2841 result[i] = null; 2842 } 2843 return result; 2844 } 2845 2846 private void flush(int count, int bytesProcessed, 2847 CompressionType compressionType, 2848 CompressionCodec codec, 2849 boolean done) throws IOException { 2850 if (out == null) { 2851 outName = done ? outFile : outFile.suffix(".0"); 2852 out = fs.create(outName); 2853 if (!done) { 2854 indexOut = fs.create(outName.suffix(".index")); 2855 } 2856 } 2857 2858 long segmentStart = out.getPos(); 2859 Writer writer = createWriter(conf, Writer.stream(out), 2860 Writer.keyClass(keyClass), Writer.valueClass(valClass), 2861 Writer.compression(compressionType, codec), 2862 Writer.metadata(done ? metadata : new Metadata())); 2863 2864 if (!done) { 2865 writer.sync = null; // disable sync on temp files 2866 } 2867 2868 for (int i = 0; i < count; i++) { // write in sorted order 2869 int p = pointers[i]; 2870 writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); 2871 } 2872 writer.close(); 2873 2874 if (!done) { 2875 // Save the segment length 2876 WritableUtils.writeVLong(indexOut, segmentStart); 2877 WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); 2878 indexOut.flush(); 2879 } 2880 } 2881 2882 private void sort(int count) { 2883 System.arraycopy(pointers, 0, pointersCopy, 0, count); 2884 mergeSort.mergeSort(pointersCopy, pointers, 0, count); 2885 } 2886 class SeqFileComparator implements Comparator<IntWritable> { 2887 public int compare(IntWritable I, IntWritable J) { 2888 return comparator.compare(rawBuffer, keyOffsets[I.get()], 2889 keyLengths[I.get()], rawBuffer, 2890 keyOffsets[J.get()], keyLengths[J.get()]); 2891 } 2892 } 2893 2894 /** set the progressable object in order to report progress */ 2895 public void setProgressable(Progressable progressable) 2896 { 2897 this.progressable = progressable; 2898 } 2899 2900 } // SequenceFile.Sorter.SortPass 2901 2902 /** The interface to iterate over raw keys/values of SequenceFiles. */ 2903 public static interface RawKeyValueIterator { 2904 /** Gets the current raw key 2905 * @return DataOutputBuffer 2906 * @throws IOException 2907 */ 2908 DataOutputBuffer getKey() throws IOException; 2909 /** Gets the current raw value 2910 * @return ValueBytes 2911 * @throws IOException 2912 */ 2913 ValueBytes getValue() throws IOException; 2914 /** Sets up the current key and value (for getKey and getValue) 2915 * @return true if there exists a key/value, false otherwise 2916 * @throws IOException 2917 */ 2918 boolean next() throws IOException; 2919 /** closes the iterator so that the underlying streams can be closed 2920 * @throws IOException 2921 */ 2922 void close() throws IOException; 2923 /** Gets the Progress object; this has a float (0.0 - 1.0) 2924 * indicating the bytes processed by the iterator so far 2925 */ 2926 Progress getProgress(); 2927 } 2928 2929 /** 2930 * Merges the list of segments of type <code>SegmentDescriptor</code> 2931 * @param segments the list of SegmentDescriptors 2932 * @param tmpDir the directory to write temporary files into 2933 * @return RawKeyValueIterator 2934 * @throws IOException 2935 */ 2936 public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 2937 Path tmpDir) 2938 throws IOException { 2939 // pass in object to report progress, if present 2940 MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); 2941 return mQueue.merge(); 2942 } 2943 2944 /** 2945 * Merges the contents of files passed in Path[] using a max factor value 2946 * that is already set 2947 * @param inNames the array of path names 2948 * @param deleteInputs true if the input files should be deleted when 2949 * unnecessary 2950 * @param tmpDir the directory to write temporary files into 2951 * @return RawKeyValueIteratorMergeQueue 2952 * @throws IOException 2953 */ 2954 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 2955 Path tmpDir) 2956 throws IOException { 2957 return merge(inNames, deleteInputs, 2958 (inNames.length < factor) ? inNames.length : factor, 2959 tmpDir); 2960 } 2961 2962 /** 2963 * Merges the contents of files passed in Path[] 2964 * @param inNames the array of path names 2965 * @param deleteInputs true if the input files should be deleted when 2966 * unnecessary 2967 * @param factor the factor that will be used as the maximum merge fan-in 2968 * @param tmpDir the directory to write temporary files into 2969 * @return RawKeyValueIteratorMergeQueue 2970 * @throws IOException 2971 */ 2972 public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs, 2973 int factor, Path tmpDir) 2974 throws IOException { 2975 //get the segments from inNames 2976 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 2977 for (int i = 0; i < inNames.length; i++) { 2978 SegmentDescriptor s = new SegmentDescriptor(0, 2979 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 2980 s.preserveInput(!deleteInputs); 2981 s.doSync(); 2982 a.add(s); 2983 } 2984 this.factor = factor; 2985 MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); 2986 return mQueue.merge(); 2987 } 2988 2989 /** 2990 * Merges the contents of files passed in Path[] 2991 * @param inNames the array of path names 2992 * @param tempDir the directory for creating temp files during merge 2993 * @param deleteInputs true if the input files should be deleted when 2994 * unnecessary 2995 * @return RawKeyValueIteratorMergeQueue 2996 * @throws IOException 2997 */ 2998 public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 2999 boolean deleteInputs) 3000 throws IOException { 3001 //outFile will basically be used as prefix for temp files for the 3002 //intermediate merge outputs 3003 this.outFile = new Path(tempDir + Path.SEPARATOR + "merged"); 3004 //get the segments from inNames 3005 ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>(); 3006 for (int i = 0; i < inNames.length; i++) { 3007 SegmentDescriptor s = new SegmentDescriptor(0, 3008 fs.getFileStatus(inNames[i]).getLen(), inNames[i]); 3009 s.preserveInput(!deleteInputs); 3010 s.doSync(); 3011 a.add(s); 3012 } 3013 factor = (inNames.length < factor) ? inNames.length : factor; 3014 // pass in object to report progress, if present 3015 MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); 3016 return mQueue.merge(); 3017 } 3018 3019 /** 3020 * Clones the attributes (like compression of the input file and creates a 3021 * corresponding Writer 3022 * @param inputFile the path of the input file whose attributes should be 3023 * cloned 3024 * @param outputFile the path of the output file 3025 * @param prog the Progressable to report status during the file write 3026 * @return Writer 3027 * @throws IOException 3028 */ 3029 public Writer cloneFileAttributes(Path inputFile, Path outputFile, 3030 Progressable prog) throws IOException { 3031 Reader reader = new Reader(conf, 3032 Reader.file(inputFile), 3033 new Reader.OnlyHeaderOption()); 3034 CompressionType compress = reader.getCompressionType(); 3035 CompressionCodec codec = reader.getCompressionCodec(); 3036 reader.close(); 3037 3038 Writer writer = createWriter(conf, 3039 Writer.file(outputFile), 3040 Writer.keyClass(keyClass), 3041 Writer.valueClass(valClass), 3042 Writer.compression(compress, codec), 3043 Writer.progressable(prog)); 3044 return writer; 3045 } 3046 3047 /** 3048 * Writes records from RawKeyValueIterator into a file represented by the 3049 * passed writer 3050 * @param records the RawKeyValueIterator 3051 * @param writer the Writer created earlier 3052 * @throws IOException 3053 */ 3054 public void writeFile(RawKeyValueIterator records, Writer writer) 3055 throws IOException { 3056 while(records.next()) { 3057 writer.appendRaw(records.getKey().getData(), 0, 3058 records.getKey().getLength(), records.getValue()); 3059 } 3060 writer.sync(); 3061 } 3062 3063 /** Merge the provided files. 3064 * @param inFiles the array of input path names 3065 * @param outFile the final output file 3066 * @throws IOException 3067 */ 3068 public void merge(Path[] inFiles, Path outFile) throws IOException { 3069 if (fs.exists(outFile)) { 3070 throw new IOException("already exists: " + outFile); 3071 } 3072 RawKeyValueIterator r = merge(inFiles, false, outFile.getParent()); 3073 Writer writer = cloneFileAttributes(inFiles[0], outFile, null); 3074 3075 writeFile(r, writer); 3076 3077 writer.close(); 3078 } 3079 3080 /** sort calls this to generate the final merged output */ 3081 private int mergePass(Path tmpDir) throws IOException { 3082 if(LOG.isDebugEnabled()) { 3083 LOG.debug("running merge pass"); 3084 } 3085 Writer writer = cloneFileAttributes( 3086 outFile.suffix(".0"), outFile, null); 3087 RawKeyValueIterator r = merge(outFile.suffix(".0"), 3088 outFile.suffix(".0.index"), tmpDir); 3089 writeFile(r, writer); 3090 3091 writer.close(); 3092 return 0; 3093 } 3094 3095 /** Used by mergePass to merge the output of the sort 3096 * @param inName the name of the input file containing sorted segments 3097 * @param indexIn the offsets of the sorted segments 3098 * @param tmpDir the relative directory to store intermediate results in 3099 * @return RawKeyValueIterator 3100 * @throws IOException 3101 */ 3102 private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 3103 throws IOException { 3104 //get the segments from indexIn 3105 //we create a SegmentContainer so that we can track segments belonging to 3106 //inName and delete inName as soon as we see that we have looked at all 3107 //the contained segments during the merge process & hence don't need 3108 //them anymore 3109 SegmentContainer container = new SegmentContainer(inName, indexIn); 3110 MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); 3111 return mQueue.merge(); 3112 } 3113 3114 /** This class implements the core of the merge logic */ 3115 private class MergeQueue extends PriorityQueue 3116 implements RawKeyValueIterator { 3117 private boolean compress; 3118 private boolean blockCompress; 3119 private DataOutputBuffer rawKey = new DataOutputBuffer(); 3120 private ValueBytes rawValue; 3121 private long totalBytesProcessed; 3122 private float progPerByte; 3123 private Progress mergeProgress = new Progress(); 3124 private Path tmpDir; 3125 private Progressable progress = null; //handle to the progress reporting object 3126 private SegmentDescriptor minSegment; 3127 3128 //a TreeMap used to store the segments sorted by size (segment offset and 3129 //segment path name is used to break ties between segments of same sizes) 3130 private Map<SegmentDescriptor, Void> sortedSegmentSizes = 3131 new TreeMap<SegmentDescriptor, Void>(); 3132 3133 @SuppressWarnings("unchecked") 3134 public void put(SegmentDescriptor stream) throws IOException { 3135 if (size() == 0) { 3136 compress = stream.in.isCompressed(); 3137 blockCompress = stream.in.isBlockCompressed(); 3138 } else if (compress != stream.in.isCompressed() || 3139 blockCompress != stream.in.isBlockCompressed()) { 3140 throw new IOException("All merged files must be compressed or not."); 3141 } 3142 super.put(stream); 3143 } 3144 3145 /** 3146 * A queue of file segments to merge 3147 * @param segments the file segments to merge 3148 * @param tmpDir a relative local directory to save intermediate files in 3149 * @param progress the reference to the Progressable object 3150 */ 3151 public MergeQueue(List <SegmentDescriptor> segments, 3152 Path tmpDir, Progressable progress) { 3153 int size = segments.size(); 3154 for (int i = 0; i < size; i++) { 3155 sortedSegmentSizes.put(segments.get(i), null); 3156 } 3157 this.tmpDir = tmpDir; 3158 this.progress = progress; 3159 } 3160 protected boolean lessThan(Object a, Object b) { 3161 // indicate we're making progress 3162 if (progress != null) { 3163 progress.progress(); 3164 } 3165 SegmentDescriptor msa = (SegmentDescriptor)a; 3166 SegmentDescriptor msb = (SegmentDescriptor)b; 3167 return comparator.compare(msa.getKey().getData(), 0, 3168 msa.getKey().getLength(), msb.getKey().getData(), 0, 3169 msb.getKey().getLength()) < 0; 3170 } 3171 public void close() throws IOException { 3172 SegmentDescriptor ms; // close inputs 3173 while ((ms = (SegmentDescriptor)pop()) != null) { 3174 ms.cleanup(); 3175 } 3176 minSegment = null; 3177 } 3178 public DataOutputBuffer getKey() throws IOException { 3179 return rawKey; 3180 } 3181 public ValueBytes getValue() throws IOException { 3182 return rawValue; 3183 } 3184 public boolean next() throws IOException { 3185 if (size() == 0) 3186 return false; 3187 if (minSegment != null) { 3188 //minSegment is non-null for all invocations of next except the first 3189 //one. For the first invocation, the priority queue is ready for use 3190 //but for the subsequent invocations, first adjust the queue 3191 adjustPriorityQueue(minSegment); 3192 if (size() == 0) { 3193 minSegment = null; 3194 return false; 3195 } 3196 } 3197 minSegment = (SegmentDescriptor)top(); 3198 long startPos = minSegment.in.getPosition(); // Current position in stream 3199 //save the raw key reference 3200 rawKey = minSegment.getKey(); 3201 //load the raw value. Re-use the existing rawValue buffer 3202 if (rawValue == null) { 3203 rawValue = minSegment.in.createValueBytes(); 3204 } 3205 minSegment.nextRawValue(rawValue); 3206 long endPos = minSegment.in.getPosition(); // End position after reading value 3207 updateProgress(endPos - startPos); 3208 return true; 3209 } 3210 3211 public Progress getProgress() { 3212 return mergeProgress; 3213 } 3214 3215 private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ 3216 long startPos = ms.in.getPosition(); // Current position in stream 3217 boolean hasNext = ms.nextRawKey(); 3218 long endPos = ms.in.getPosition(); // End position after reading key 3219 updateProgress(endPos - startPos); 3220 if (hasNext) { 3221 adjustTop(); 3222 } else { 3223 pop(); 3224 ms.cleanup(); 3225 } 3226 } 3227 3228 private void updateProgress(long bytesProcessed) { 3229 totalBytesProcessed += bytesProcessed; 3230 if (progPerByte > 0) { 3231 mergeProgress.set(totalBytesProcessed * progPerByte); 3232 } 3233 } 3234 3235 /** This is the single level merge that is called multiple times 3236 * depending on the factor size and the number of segments 3237 * @return RawKeyValueIterator 3238 * @throws IOException 3239 */ 3240 public RawKeyValueIterator merge() throws IOException { 3241 //create the MergeStreams from the sorted map created in the constructor 3242 //and dump the final output to a file 3243 int numSegments = sortedSegmentSizes.size(); 3244 int origFactor = factor; 3245 int passNo = 1; 3246 LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir"); 3247 do { 3248 //get the factor for this pass of merge 3249 factor = getPassFactor(passNo, numSegments); 3250 List<SegmentDescriptor> segmentsToMerge = 3251 new ArrayList<SegmentDescriptor>(); 3252 int segmentsConsidered = 0; 3253 int numSegmentsToConsider = factor; 3254 while (true) { 3255 //extract the smallest 'factor' number of segment pointers from the 3256 //TreeMap. Call cleanup on the empty segments (no key/value data) 3257 SegmentDescriptor[] mStream = 3258 getSegmentDescriptors(numSegmentsToConsider); 3259 for (int i = 0; i < mStream.length; i++) { 3260 if (mStream[i].nextRawKey()) { 3261 segmentsToMerge.add(mStream[i]); 3262 segmentsConsidered++; 3263 // Count the fact that we read some bytes in calling nextRawKey() 3264 updateProgress(mStream[i].in.getPosition()); 3265 } 3266 else { 3267 mStream[i].cleanup(); 3268 numSegments--; //we ignore this segment for the merge 3269 } 3270 } 3271 //if we have the desired number of segments 3272 //or looked at all available segments, we break 3273 if (segmentsConsidered == factor || 3274 sortedSegmentSizes.size() == 0) { 3275 break; 3276 } 3277 3278 numSegmentsToConsider = factor - segmentsConsidered; 3279 } 3280 //feed the streams to the priority queue 3281 initialize(segmentsToMerge.size()); clear(); 3282 for (int i = 0; i < segmentsToMerge.size(); i++) { 3283 put(segmentsToMerge.get(i)); 3284 } 3285 //if we have lesser number of segments remaining, then just return the 3286 //iterator, else do another single level merge 3287 if (numSegments <= factor) { 3288 //calculate the length of the remaining segments. Required for 3289 //calculating the merge progress 3290 long totalBytes = 0; 3291 for (int i = 0; i < segmentsToMerge.size(); i++) { 3292 totalBytes += segmentsToMerge.get(i).segmentLength; 3293 } 3294 if (totalBytes != 0) //being paranoid 3295 progPerByte = 1.0f / (float)totalBytes; 3296 //reset factor to what it originally was 3297 factor = origFactor; 3298 return this; 3299 } else { 3300 //we want to spread the creation of temp files on multiple disks if 3301 //available under the space constraints 3302 long approxOutputSize = 0; 3303 for (SegmentDescriptor s : segmentsToMerge) { 3304 approxOutputSize += s.segmentLength + 3305 ChecksumFileSystem.getApproxChkSumLength( 3306 s.segmentLength); 3307 } 3308 Path tmpFilename = 3309 new Path(tmpDir, "intermediate").suffix("." + passNo); 3310 3311 Path outputFile = lDirAlloc.getLocalPathForWrite( 3312 tmpFilename.toString(), 3313 approxOutputSize, conf); 3314 if(LOG.isDebugEnabled()) { 3315 LOG.debug("writing intermediate results to " + outputFile); 3316 } 3317 Writer writer = cloneFileAttributes( 3318 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 3319 fs.makeQualified(outputFile), null); 3320 writer.sync = null; //disable sync for temp files 3321 writeFile(this, writer); 3322 writer.close(); 3323 3324 //we finished one single level merge; now clean up the priority 3325 //queue 3326 this.close(); 3327 3328 SegmentDescriptor tempSegment = 3329 new SegmentDescriptor(0, 3330 fs.getFileStatus(outputFile).getLen(), outputFile); 3331 //put the segment back in the TreeMap 3332 sortedSegmentSizes.put(tempSegment, null); 3333 numSegments = sortedSegmentSizes.size(); 3334 passNo++; 3335 } 3336 //we are worried about only the first pass merge factor. So reset the 3337 //factor to what it originally was 3338 factor = origFactor; 3339 } while(true); 3340 } 3341 3342 //Hadoop-591 3343 public int getPassFactor(int passNo, int numSegments) { 3344 if (passNo > 1 || numSegments <= factor || factor == 1) 3345 return factor; 3346 int mod = (numSegments - 1) % (factor - 1); 3347 if (mod == 0) 3348 return factor; 3349 return mod + 1; 3350 } 3351 3352 /** Return (& remove) the requested number of segment descriptors from the 3353 * sorted map. 3354 */ 3355 public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) { 3356 if (numDescriptors > sortedSegmentSizes.size()) 3357 numDescriptors = sortedSegmentSizes.size(); 3358 SegmentDescriptor[] SegmentDescriptors = 3359 new SegmentDescriptor[numDescriptors]; 3360 Iterator iter = sortedSegmentSizes.keySet().iterator(); 3361 int i = 0; 3362 while (i < numDescriptors) { 3363 SegmentDescriptors[i++] = (SegmentDescriptor)iter.next(); 3364 iter.remove(); 3365 } 3366 return SegmentDescriptors; 3367 } 3368 } // SequenceFile.Sorter.MergeQueue 3369 3370 /** This class defines a merge segment. This class can be subclassed to 3371 * provide a customized cleanup method implementation. In this 3372 * implementation, cleanup closes the file handle and deletes the file 3373 */ 3374 public class SegmentDescriptor implements Comparable { 3375 3376 long segmentOffset; //the start of the segment in the file 3377 long segmentLength; //the length of the segment 3378 Path segmentPathName; //the path name of the file containing the segment 3379 boolean ignoreSync = true; //set to true for temp files 3380 private Reader in = null; 3381 private DataOutputBuffer rawKey = null; //this will hold the current key 3382 private boolean preserveInput = false; //delete input segment files? 3383 3384 /** Constructs a segment 3385 * @param segmentOffset the offset of the segment in the file 3386 * @param segmentLength the length of the segment 3387 * @param segmentPathName the path name of the file containing the segment 3388 */ 3389 public SegmentDescriptor (long segmentOffset, long segmentLength, 3390 Path segmentPathName) { 3391 this.segmentOffset = segmentOffset; 3392 this.segmentLength = segmentLength; 3393 this.segmentPathName = segmentPathName; 3394 } 3395 3396 /** Do the sync checks */ 3397 public void doSync() {ignoreSync = false;} 3398 3399 /** Whether to delete the files when no longer needed */ 3400 public void preserveInput(boolean preserve) { 3401 preserveInput = preserve; 3402 } 3403 3404 public boolean shouldPreserveInput() { 3405 return preserveInput; 3406 } 3407 3408 public int compareTo(Object o) { 3409 SegmentDescriptor that = (SegmentDescriptor)o; 3410 if (this.segmentLength != that.segmentLength) { 3411 return (this.segmentLength < that.segmentLength ? -1 : 1); 3412 } 3413 if (this.segmentOffset != that.segmentOffset) { 3414 return (this.segmentOffset < that.segmentOffset ? -1 : 1); 3415 } 3416 return (this.segmentPathName.toString()). 3417 compareTo(that.segmentPathName.toString()); 3418 } 3419 3420 public boolean equals(Object o) { 3421 if (!(o instanceof SegmentDescriptor)) { 3422 return false; 3423 } 3424 SegmentDescriptor that = (SegmentDescriptor)o; 3425 if (this.segmentLength == that.segmentLength && 3426 this.segmentOffset == that.segmentOffset && 3427 this.segmentPathName.toString().equals( 3428 that.segmentPathName.toString())) { 3429 return true; 3430 } 3431 return false; 3432 } 3433 3434 public int hashCode() { 3435 return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32)); 3436 } 3437 3438 /** Fills up the rawKey object with the key returned by the Reader 3439 * @return true if there is a key returned; false, otherwise 3440 * @throws IOException 3441 */ 3442 public boolean nextRawKey() throws IOException { 3443 if (in == null) { 3444 int bufferSize = getBufferSize(conf); 3445 Reader reader = new Reader(conf, 3446 Reader.file(segmentPathName), 3447 Reader.bufferSize(bufferSize), 3448 Reader.start(segmentOffset), 3449 Reader.length(segmentLength)); 3450 3451 //sometimes we ignore syncs especially for temp merge files 3452 if (ignoreSync) reader.ignoreSync(); 3453 3454 if (reader.getKeyClass() != keyClass) 3455 throw new IOException("wrong key class: " + reader.getKeyClass() + 3456 " is not " + keyClass); 3457 if (reader.getValueClass() != valClass) 3458 throw new IOException("wrong value class: "+reader.getValueClass()+ 3459 " is not " + valClass); 3460 this.in = reader; 3461 rawKey = new DataOutputBuffer(); 3462 } 3463 rawKey.reset(); 3464 int keyLength = 3465 in.nextRawKey(rawKey); 3466 return (keyLength >= 0); 3467 } 3468 3469 /** Fills up the passed rawValue with the value corresponding to the key 3470 * read earlier 3471 * @param rawValue 3472 * @return the length of the value 3473 * @throws IOException 3474 */ 3475 public int nextRawValue(ValueBytes rawValue) throws IOException { 3476 int valLength = in.nextRawValue(rawValue); 3477 return valLength; 3478 } 3479 3480 /** Returns the stored rawKey */ 3481 public DataOutputBuffer getKey() { 3482 return rawKey; 3483 } 3484 3485 /** closes the underlying reader */ 3486 private void close() throws IOException { 3487 this.in.close(); 3488 this.in = null; 3489 } 3490 3491 /** The default cleanup. Subclasses can override this with a custom 3492 * cleanup 3493 */ 3494 public void cleanup() throws IOException { 3495 close(); 3496 if (!preserveInput) { 3497 fs.delete(segmentPathName, true); 3498 } 3499 } 3500 } // SequenceFile.Sorter.SegmentDescriptor 3501 3502 /** This class provisions multiple segments contained within a single 3503 * file 3504 */ 3505 private class LinkedSegmentsDescriptor extends SegmentDescriptor { 3506 3507 SegmentContainer parentContainer = null; 3508 3509 /** Constructs a segment 3510 * @param segmentOffset the offset of the segment in the file 3511 * @param segmentLength the length of the segment 3512 * @param segmentPathName the path name of the file containing the segment 3513 * @param parent the parent SegmentContainer that holds the segment 3514 */ 3515 public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 3516 Path segmentPathName, SegmentContainer parent) { 3517 super(segmentOffset, segmentLength, segmentPathName); 3518 this.parentContainer = parent; 3519 } 3520 /** The default cleanup. Subclasses can override this with a custom 3521 * cleanup 3522 */ 3523 public void cleanup() throws IOException { 3524 super.close(); 3525 if (super.shouldPreserveInput()) return; 3526 parentContainer.cleanup(); 3527 } 3528 3529 public boolean equals(Object o) { 3530 if (!(o instanceof LinkedSegmentsDescriptor)) { 3531 return false; 3532 } 3533 return super.equals(o); 3534 } 3535 } //SequenceFile.Sorter.LinkedSegmentsDescriptor 3536 3537 /** The class that defines a container for segments to be merged. Primarily 3538 * required to delete temp files as soon as all the contained segments 3539 * have been looked at */ 3540 private class SegmentContainer { 3541 private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups 3542 private int numSegmentsContained; //# of segments contained 3543 private Path inName; //input file from where segments are created 3544 3545 //the list of segments read from the file 3546 private ArrayList <SegmentDescriptor> segments = 3547 new ArrayList <SegmentDescriptor>(); 3548 /** This constructor is there primarily to serve the sort routine that 3549 * generates a single output file with an associated index file */ 3550 public SegmentContainer(Path inName, Path indexIn) throws IOException { 3551 //get the segments from indexIn 3552 FSDataInputStream fsIndexIn = fs.open(indexIn); 3553 long end = fs.getFileStatus(indexIn).getLen(); 3554 while (fsIndexIn.getPos() < end) { 3555 long segmentOffset = WritableUtils.readVLong(fsIndexIn); 3556 long segmentLength = WritableUtils.readVLong(fsIndexIn); 3557 Path segmentName = inName; 3558 segments.add(new LinkedSegmentsDescriptor(segmentOffset, 3559 segmentLength, segmentName, this)); 3560 } 3561 fsIndexIn.close(); 3562 fs.delete(indexIn, true); 3563 numSegmentsContained = segments.size(); 3564 this.inName = inName; 3565 } 3566 3567 public List <SegmentDescriptor> getSegmentList() { 3568 return segments; 3569 } 3570 public void cleanup() throws IOException { 3571 numSegmentsCleanedUp++; 3572 if (numSegmentsCleanedUp == numSegmentsContained) { 3573 fs.delete(inName, true); 3574 } 3575 } 3576 } //SequenceFile.Sorter.SegmentContainer 3577 3578 } // SequenceFile.Sorter 3579 3580} // SequenceFile