001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025
026import org.apache.commons.io.Charsets;
027import org.apache.commons.logging.*;
028import org.apache.hadoop.util.Options;
029import org.apache.hadoop.fs.*;
030import org.apache.hadoop.fs.Options.CreateOpts;
031import org.apache.hadoop.io.compress.CodecPool;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.hadoop.io.compress.DefaultCodec;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.io.compress.zlib.ZlibFactory;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.Serializer;
042import org.apache.hadoop.io.serializer.SerializationFactory;
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.*;
046import org.apache.hadoop.util.Progressable;
047import org.apache.hadoop.util.Progress;
048import org.apache.hadoop.util.ReflectionUtils;
049import org.apache.hadoop.util.NativeCodeLoader;
050import org.apache.hadoop.util.MergeSort;
051import org.apache.hadoop.util.PriorityQueue;
052import org.apache.hadoop.util.Time;
053
054/** 
055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
056 * pairs.
057 * 
058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
060 * reading and sorting respectively.</p>
061 * 
062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
063 * {@link CompressionType} used to compress key/value pairs:
064 * <ol>
065 *   <li>
066 *   <code>Writer</code> : Uncompressed records.
067 *   </li>
068 *   <li>
069 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
070 *                                       values.
071 *   </li>
072 *   <li>
073 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
074 *                                      values are collected in 'blocks' 
075 *                                      separately and compressed. The size of 
076 *                                      the 'block' is configurable.
077 * </ol>
078 * 
079 * <p>The actual compression algorithm used to compress key and/or values can be
080 * specified by using the appropriate {@link CompressionCodec}.</p>
081 * 
082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
084 *
085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
086 * above <code>SequenceFile</code> formats.</p>
087 *
088 * <h4 id="Formats">SequenceFile Formats</h4>
089 * 
090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
091 * depending on the <code>CompressionType</code> specified. All of them share a
092 * <a href="#Header">common header</a> described below.
093 * 
094 * <h5 id="Header">SequenceFile Header</h5>
095 * <ul>
096 *   <li>
097 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
098 *             version number (e.g. SEQ4 or SEQ6)
099 *   </li>
100 *   <li>
101 *   keyClassName -key class
102 *   </li>
103 *   <li>
104 *   valueClassName - value class
105 *   </li>
106 *   <li>
107 *   compression - A boolean which specifies if compression is turned on for 
108 *                 keys/values in this file.
109 *   </li>
110 *   <li>
111 *   blockCompression - A boolean which specifies if block-compression is 
112 *                      turned on for keys/values in this file.
113 *   </li>
114 *   <li>
115 *   compression codec - <code>CompressionCodec</code> class which is used for  
116 *                       compression of keys and/or values (if compression is 
117 *                       enabled).
118 *   </li>
119 *   <li>
120 *   metadata - {@link Metadata} for this file.
121 *   </li>
122 *   <li>
123 *   sync - A sync marker to denote end of the header.
124 *   </li>
125 * </ul>
126 * 
127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
128 * <ul>
129 * <li>
130 * <a href="#Header">Header</a>
131 * </li>
132 * <li>
133 * Record
134 *   <ul>
135 *     <li>Record length</li>
136 *     <li>Key length</li>
137 *     <li>Key</li>
138 *     <li>Value</li>
139 *   </ul>
140 * </li>
141 * <li>
142 * A sync-marker every few <code>100</code> bytes or so.
143 * </li>
144 * </ul>
145 *
146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
147 * <ul>
148 * <li>
149 * <a href="#Header">Header</a>
150 * </li>
151 * <li>
152 * Record
153 *   <ul>
154 *     <li>Record length</li>
155 *     <li>Key length</li>
156 *     <li>Key</li>
157 *     <li><i>Compressed</i> Value</li>
158 *   </ul>
159 * </li>
160 * <li>
161 * A sync-marker every few <code>100</code> bytes or so.
162 * </li>
163 * </ul>
164 * 
165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
166 * <ul>
167 * <li>
168 * <a href="#Header">Header</a>
169 * </li>
170 * <li>
171 * Record <i>Block</i>
172 *   <ul>
173 *     <li>Uncompressed number of records in the block</li>
174 *     <li>Compressed key-lengths block-size</li>
175 *     <li>Compressed key-lengths block</li>
176 *     <li>Compressed keys block-size</li>
177 *     <li>Compressed keys block</li>
178 *     <li>Compressed value-lengths block-size</li>
179 *     <li>Compressed value-lengths block</li>
180 *     <li>Compressed values block-size</li>
181 *     <li>Compressed values block</li>
182 *   </ul>
183 * </li>
184 * <li>
185 * A sync-marker every block.
186 * </li>
187 * </ul>
188 * 
189 * <p>The compressed blocks of key lengths and value lengths consist of the 
190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
191 * format.</p>
192 * 
193 * @see CompressionCodec
194 */
195@InterfaceAudience.Public
196@InterfaceStability.Stable
197public class SequenceFile {
198  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
199
200  private SequenceFile() {}                         // no public ctor
201
202  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
203  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
204  private static final byte VERSION_WITH_METADATA = (byte)6;
205  private static byte[] VERSION = new byte[] {
206    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
207  };
208
209  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
210  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
211  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
212
213  /** The number of bytes between sync points.*/
214  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
215
216  /** 
217   * The compression type used to compress key/value pairs in the 
218   * {@link SequenceFile}.
219   * 
220   * @see SequenceFile.Writer
221   */
222  public static enum CompressionType {
223    /** Do not compress records. */
224    NONE, 
225    /** Compress values only, each separately. */
226    RECORD,
227    /** Compress sequences of records together in blocks. */
228    BLOCK
229  }
230
231  /**
232   * Get the compression type for the reduce outputs
233   * @param job the job config to look in
234   * @return the kind of compression to use
235   */
236  static public CompressionType getDefaultCompressionType(Configuration job) {
237    String name = job.get("io.seqfile.compression.type");
238    return name == null ? CompressionType.RECORD : 
239      CompressionType.valueOf(name);
240  }
241  
242  /**
243   * Set the default compression type for sequence files.
244   * @param job the configuration to modify
245   * @param val the new compression type (none, block, record)
246   */
247  static public void setDefaultCompressionType(Configuration job, 
248                                               CompressionType val) {
249    job.set("io.seqfile.compression.type", val.toString());
250  }
251
252  /**
253   * Create a new Writer with the given options.
254   * @param conf the configuration to use
255   * @param opts the options to create the file with
256   * @return a new Writer
257   * @throws IOException
258   */
259  public static Writer createWriter(Configuration conf, Writer.Option... opts
260                                    ) throws IOException {
261    Writer.CompressionOption compressionOption = 
262      Options.getOption(Writer.CompressionOption.class, opts);
263    CompressionType kind;
264    if (compressionOption != null) {
265      kind = compressionOption.getValue();
266    } else {
267      kind = getDefaultCompressionType(conf);
268      opts = Options.prependOptions(opts, Writer.compression(kind));
269    }
270    switch (kind) {
271      default:
272      case NONE:
273        return new Writer(conf, opts);
274      case RECORD:
275        return new RecordCompressWriter(conf, opts);
276      case BLOCK:
277        return new BlockCompressWriter(conf, opts);
278    }
279  }
280
281  /**
282   * Construct the preferred type of SequenceFile Writer.
283   * @param fs The configured filesystem. 
284   * @param conf The configuration.
285   * @param name The name of the file. 
286   * @param keyClass The 'key' type.
287   * @param valClass The 'value' type.
288   * @return Returns the handle to the constructed SequenceFile Writer.
289   * @throws IOException
290   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
291   *     instead.
292   */
293  @Deprecated
294  public static Writer 
295    createWriter(FileSystem fs, Configuration conf, Path name, 
296                 Class keyClass, Class valClass) throws IOException {
297    return createWriter(conf, Writer.filesystem(fs),
298                        Writer.file(name), Writer.keyClass(keyClass),
299                        Writer.valueClass(valClass));
300  }
301  
302  /**
303   * Construct the preferred type of SequenceFile Writer.
304   * @param fs The configured filesystem. 
305   * @param conf The configuration.
306   * @param name The name of the file. 
307   * @param keyClass The 'key' type.
308   * @param valClass The 'value' type.
309   * @param compressionType The compression type.
310   * @return Returns the handle to the constructed SequenceFile Writer.
311   * @throws IOException
312   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
313   *     instead.
314   */
315  @Deprecated
316  public static Writer 
317    createWriter(FileSystem fs, Configuration conf, Path name, 
318                 Class keyClass, Class valClass, 
319                 CompressionType compressionType) throws IOException {
320    return createWriter(conf, Writer.filesystem(fs),
321                        Writer.file(name), Writer.keyClass(keyClass),
322                        Writer.valueClass(valClass), 
323                        Writer.compression(compressionType));
324  }
325  
326  /**
327   * Construct the preferred type of SequenceFile Writer.
328   * @param fs The configured filesystem. 
329   * @param conf The configuration.
330   * @param name The name of the file. 
331   * @param keyClass The 'key' type.
332   * @param valClass The 'value' type.
333   * @param compressionType The compression type.
334   * @param progress The Progressable object to track progress.
335   * @return Returns the handle to the constructed SequenceFile Writer.
336   * @throws IOException
337   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
338   *     instead.
339   */
340  @Deprecated
341  public static Writer
342    createWriter(FileSystem fs, Configuration conf, Path name, 
343                 Class keyClass, Class valClass, CompressionType compressionType,
344                 Progressable progress) throws IOException {
345    return createWriter(conf, Writer.file(name),
346                        Writer.filesystem(fs),
347                        Writer.keyClass(keyClass),
348                        Writer.valueClass(valClass), 
349                        Writer.compression(compressionType),
350                        Writer.progressable(progress));
351  }
352
353  /**
354   * Construct the preferred type of SequenceFile Writer.
355   * @param fs The configured filesystem. 
356   * @param conf The configuration.
357   * @param name The name of the file. 
358   * @param keyClass The 'key' type.
359   * @param valClass The 'value' type.
360   * @param compressionType The compression type.
361   * @param codec The compression codec.
362   * @return Returns the handle to the constructed SequenceFile Writer.
363   * @throws IOException
364   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
365   *     instead.
366   */
367  @Deprecated
368  public static Writer 
369    createWriter(FileSystem fs, Configuration conf, Path name, 
370                 Class keyClass, Class valClass, CompressionType compressionType, 
371                 CompressionCodec codec) throws IOException {
372    return createWriter(conf, Writer.file(name),
373                        Writer.filesystem(fs),
374                        Writer.keyClass(keyClass),
375                        Writer.valueClass(valClass), 
376                        Writer.compression(compressionType, codec));
377  }
378  
379  /**
380   * Construct the preferred type of SequenceFile Writer.
381   * @param fs The configured filesystem. 
382   * @param conf The configuration.
383   * @param name The name of the file. 
384   * @param keyClass The 'key' type.
385   * @param valClass The 'value' type.
386   * @param compressionType The compression type.
387   * @param codec The compression codec.
388   * @param progress The Progressable object to track progress.
389   * @param metadata The metadata of the file.
390   * @return Returns the handle to the constructed SequenceFile Writer.
391   * @throws IOException
392   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
393   *     instead.
394   */
395  @Deprecated
396  public static Writer
397    createWriter(FileSystem fs, Configuration conf, Path name, 
398                 Class keyClass, Class valClass, 
399                 CompressionType compressionType, CompressionCodec codec,
400                 Progressable progress, Metadata metadata) throws IOException {
401    return createWriter(conf, Writer.file(name),
402                        Writer.filesystem(fs),
403                        Writer.keyClass(keyClass),
404                        Writer.valueClass(valClass),
405                        Writer.compression(compressionType, codec),
406                        Writer.progressable(progress),
407                        Writer.metadata(metadata));
408  }
409
410  /**
411   * Construct the preferred type of SequenceFile Writer.
412   * @param fs The configured filesystem.
413   * @param conf The configuration.
414   * @param name The name of the file.
415   * @param keyClass The 'key' type.
416   * @param valClass The 'value' type.
417   * @param bufferSize buffer size for the underlaying outputstream.
418   * @param replication replication factor for the file.
419   * @param blockSize block size for the file.
420   * @param compressionType The compression type.
421   * @param codec The compression codec.
422   * @param progress The Progressable object to track progress.
423   * @param metadata The metadata of the file.
424   * @return Returns the handle to the constructed SequenceFile Writer.
425   * @throws IOException
426   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
427   *     instead.
428   */
429  @Deprecated
430  public static Writer
431    createWriter(FileSystem fs, Configuration conf, Path name,
432                 Class keyClass, Class valClass, int bufferSize,
433                 short replication, long blockSize,
434                 CompressionType compressionType, CompressionCodec codec,
435                 Progressable progress, Metadata metadata) throws IOException {
436    return createWriter(conf, Writer.file(name),
437                        Writer.filesystem(fs),
438                        Writer.keyClass(keyClass),
439                        Writer.valueClass(valClass), 
440                        Writer.bufferSize(bufferSize), 
441                        Writer.replication(replication),
442                        Writer.blockSize(blockSize),
443                        Writer.compression(compressionType, codec),
444                        Writer.progressable(progress),
445                        Writer.metadata(metadata));
446  }
447
448  /**
449   * Construct the preferred type of SequenceFile Writer.
450   * @param fs The configured filesystem.
451   * @param conf The configuration.
452   * @param name The name of the file.
453   * @param keyClass The 'key' type.
454   * @param valClass The 'value' type.
455   * @param bufferSize buffer size for the underlaying outputstream.
456   * @param replication replication factor for the file.
457   * @param blockSize block size for the file.
458   * @param createParent create parent directory if non-existent
459   * @param compressionType The compression type.
460   * @param codec The compression codec.
461   * @param metadata The metadata of the file.
462   * @return Returns the handle to the constructed SequenceFile Writer.
463   * @throws IOException
464   */
465  @Deprecated
466  public static Writer
467  createWriter(FileSystem fs, Configuration conf, Path name,
468               Class keyClass, Class valClass, int bufferSize,
469               short replication, long blockSize, boolean createParent,
470               CompressionType compressionType, CompressionCodec codec,
471               Metadata metadata) throws IOException {
472    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
473        conf, name, keyClass, valClass, compressionType, codec,
474        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
475        CreateOpts.bufferSize(bufferSize),
476        createParent ? CreateOpts.createParent()
477                     : CreateOpts.donotCreateParent(),
478        CreateOpts.repFac(replication),
479        CreateOpts.blockSize(blockSize)
480      );
481  }
482
483  /**
484   * Construct the preferred type of SequenceFile Writer.
485   * @param fc The context for the specified file.
486   * @param conf The configuration.
487   * @param name The name of the file.
488   * @param keyClass The 'key' type.
489   * @param valClass The 'value' type.
490   * @param compressionType The compression type.
491   * @param codec The compression codec.
492   * @param metadata The metadata of the file.
493   * @param createFlag gives the semantics of create: overwrite, append etc.
494   * @param opts file creation options; see {@link CreateOpts}.
495   * @return Returns the handle to the constructed SequenceFile Writer.
496   * @throws IOException
497   */
498  public static Writer
499  createWriter(FileContext fc, Configuration conf, Path name,
500               Class keyClass, Class valClass,
501               CompressionType compressionType, CompressionCodec codec,
502               Metadata metadata,
503               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
504               throws IOException {
505    return createWriter(conf, fc.create(name, createFlag, opts),
506          keyClass, valClass, compressionType, codec, metadata).ownStream();
507  }
508
509  /**
510   * Construct the preferred type of SequenceFile Writer.
511   * @param fs The configured filesystem. 
512   * @param conf The configuration.
513   * @param name The name of the file. 
514   * @param keyClass The 'key' type.
515   * @param valClass The 'value' type.
516   * @param compressionType The compression type.
517   * @param codec The compression codec.
518   * @param progress The Progressable object to track progress.
519   * @return Returns the handle to the constructed SequenceFile Writer.
520   * @throws IOException
521   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
522   *     instead.
523   */
524  @Deprecated
525  public static Writer
526    createWriter(FileSystem fs, Configuration conf, Path name, 
527                 Class keyClass, Class valClass, 
528                 CompressionType compressionType, CompressionCodec codec,
529                 Progressable progress) throws IOException {
530    return createWriter(conf, Writer.file(name),
531                        Writer.filesystem(fs),
532                        Writer.keyClass(keyClass),
533                        Writer.valueClass(valClass),
534                        Writer.compression(compressionType, codec),
535                        Writer.progressable(progress));
536  }
537
538  /**
539   * Construct the preferred type of 'raw' SequenceFile Writer.
540   * @param conf The configuration.
541   * @param out The stream on top which the writer is to be constructed.
542   * @param keyClass The 'key' type.
543   * @param valClass The 'value' type.
544   * @param compressionType The compression type.
545   * @param codec The compression codec.
546   * @param metadata The metadata of the file.
547   * @return Returns the handle to the constructed SequenceFile Writer.
548   * @throws IOException
549   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
550   *     instead.
551   */
552  @Deprecated
553  public static Writer
554    createWriter(Configuration conf, FSDataOutputStream out, 
555                 Class keyClass, Class valClass,
556                 CompressionType compressionType,
557                 CompressionCodec codec, Metadata metadata) throws IOException {
558    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
559                        Writer.valueClass(valClass), 
560                        Writer.compression(compressionType, codec),
561                        Writer.metadata(metadata));
562  }
563  
564  /**
565   * Construct the preferred type of 'raw' SequenceFile Writer.
566   * @param conf The configuration.
567   * @param out The stream on top which the writer is to be constructed.
568   * @param keyClass The 'key' type.
569   * @param valClass The 'value' type.
570   * @param compressionType The compression type.
571   * @param codec The compression codec.
572   * @return Returns the handle to the constructed SequenceFile Writer.
573   * @throws IOException
574   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
575   *     instead.
576   */
577  @Deprecated
578  public static Writer
579    createWriter(Configuration conf, FSDataOutputStream out, 
580                 Class keyClass, Class valClass, CompressionType compressionType,
581                 CompressionCodec codec) throws IOException {
582    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
583                        Writer.valueClass(valClass),
584                        Writer.compression(compressionType, codec));
585  }
586  
587
588  /** The interface to 'raw' values of SequenceFiles. */
589  public static interface ValueBytes {
590
591    /** Writes the uncompressed bytes to the outStream.
592     * @param outStream : Stream to write uncompressed bytes into.
593     * @throws IOException
594     */
595    public void writeUncompressedBytes(DataOutputStream outStream)
596      throws IOException;
597
598    /** Write compressed bytes to outStream. 
599     * Note: that it will NOT compress the bytes if they are not compressed.
600     * @param outStream : Stream to write compressed bytes into.
601     */
602    public void writeCompressedBytes(DataOutputStream outStream) 
603      throws IllegalArgumentException, IOException;
604
605    /**
606     * Size of stored data.
607     */
608    public int getSize();
609  }
610  
611  private static class UncompressedBytes implements ValueBytes {
612    private int dataSize;
613    private byte[] data;
614    
615    private UncompressedBytes() {
616      data = null;
617      dataSize = 0;
618    }
619    
620    private void reset(DataInputStream in, int length) throws IOException {
621      if (data == null) {
622        data = new byte[length];
623      } else if (length > data.length) {
624        data = new byte[Math.max(length, data.length * 2)];
625      }
626      dataSize = -1;
627      in.readFully(data, 0, length);
628      dataSize = length;
629    }
630    
631    @Override
632    public int getSize() {
633      return dataSize;
634    }
635    
636    @Override
637    public void writeUncompressedBytes(DataOutputStream outStream)
638      throws IOException {
639      outStream.write(data, 0, dataSize);
640    }
641
642    @Override
643    public void writeCompressedBytes(DataOutputStream outStream) 
644      throws IllegalArgumentException, IOException {
645      throw 
646        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
647    }
648
649  } // UncompressedBytes
650  
651  private static class CompressedBytes implements ValueBytes {
652    private int dataSize;
653    private byte[] data;
654    DataInputBuffer rawData = null;
655    CompressionCodec codec = null;
656    CompressionInputStream decompressedStream = null;
657
658    private CompressedBytes(CompressionCodec codec) {
659      data = null;
660      dataSize = 0;
661      this.codec = codec;
662    }
663
664    private void reset(DataInputStream in, int length) throws IOException {
665      if (data == null) {
666        data = new byte[length];
667      } else if (length > data.length) {
668        data = new byte[Math.max(length, data.length * 2)];
669      } 
670      dataSize = -1;
671      in.readFully(data, 0, length);
672      dataSize = length;
673    }
674    
675    @Override
676    public int getSize() {
677      return dataSize;
678    }
679    
680    @Override
681    public void writeUncompressedBytes(DataOutputStream outStream)
682      throws IOException {
683      if (decompressedStream == null) {
684        rawData = new DataInputBuffer();
685        decompressedStream = codec.createInputStream(rawData);
686      } else {
687        decompressedStream.resetState();
688      }
689      rawData.reset(data, 0, dataSize);
690
691      byte[] buffer = new byte[8192];
692      int bytesRead = 0;
693      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
694        outStream.write(buffer, 0, bytesRead);
695      }
696    }
697
698    @Override
699    public void writeCompressedBytes(DataOutputStream outStream) 
700      throws IllegalArgumentException, IOException {
701      outStream.write(data, 0, dataSize);
702    }
703
704  } // CompressedBytes
705  
706  /**
707   * The class encapsulating with the metadata of a file.
708   * The metadata of a file is a list of attribute name/value
709   * pairs of Text type.
710   *
711   */
712  public static class Metadata implements Writable {
713
714    private TreeMap<Text, Text> theMetadata;
715    
716    public Metadata() {
717      this(new TreeMap<Text, Text>());
718    }
719    
720    public Metadata(TreeMap<Text, Text> arg) {
721      if (arg == null) {
722        this.theMetadata = new TreeMap<Text, Text>();
723      } else {
724        this.theMetadata = arg;
725      }
726    }
727    
728    public Text get(Text name) {
729      return this.theMetadata.get(name);
730    }
731    
732    public void set(Text name, Text value) {
733      this.theMetadata.put(name, value);
734    }
735    
736    public TreeMap<Text, Text> getMetadata() {
737      return new TreeMap<Text, Text>(this.theMetadata);
738    }
739    
740    @Override
741    public void write(DataOutput out) throws IOException {
742      out.writeInt(this.theMetadata.size());
743      Iterator<Map.Entry<Text, Text>> iter =
744        this.theMetadata.entrySet().iterator();
745      while (iter.hasNext()) {
746        Map.Entry<Text, Text> en = iter.next();
747        en.getKey().write(out);
748        en.getValue().write(out);
749      }
750    }
751
752    @Override
753    public void readFields(DataInput in) throws IOException {
754      int sz = in.readInt();
755      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
756      this.theMetadata = new TreeMap<Text, Text>();
757      for (int i = 0; i < sz; i++) {
758        Text key = new Text();
759        Text val = new Text();
760        key.readFields(in);
761        val.readFields(in);
762        this.theMetadata.put(key, val);
763      }    
764    }
765
766    @Override
767    public boolean equals(Object other) {
768      if (other == null) {
769        return false;
770      }
771      if (other.getClass() != this.getClass()) {
772        return false;
773      } else {
774        return equals((Metadata)other);
775      }
776    }
777    
778    public boolean equals(Metadata other) {
779      if (other == null) return false;
780      if (this.theMetadata.size() != other.theMetadata.size()) {
781        return false;
782      }
783      Iterator<Map.Entry<Text, Text>> iter1 =
784        this.theMetadata.entrySet().iterator();
785      Iterator<Map.Entry<Text, Text>> iter2 =
786        other.theMetadata.entrySet().iterator();
787      while (iter1.hasNext() && iter2.hasNext()) {
788        Map.Entry<Text, Text> en1 = iter1.next();
789        Map.Entry<Text, Text> en2 = iter2.next();
790        if (!en1.getKey().equals(en2.getKey())) {
791          return false;
792        }
793        if (!en1.getValue().equals(en2.getValue())) {
794          return false;
795        }
796      }
797      if (iter1.hasNext() || iter2.hasNext()) {
798        return false;
799      }
800      return true;
801    }
802
803    @Override
804    public int hashCode() {
805      assert false : "hashCode not designed";
806      return 42; // any arbitrary constant will do 
807    }
808    
809    @Override
810    public String toString() {
811      StringBuilder sb = new StringBuilder();
812      sb.append("size: ").append(this.theMetadata.size()).append("\n");
813      Iterator<Map.Entry<Text, Text>> iter =
814        this.theMetadata.entrySet().iterator();
815      while (iter.hasNext()) {
816        Map.Entry<Text, Text> en = iter.next();
817        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
818        sb.append("\n");
819      }
820      return sb.toString();
821    }
822  }
823  
824  /** Write key/value pairs to a sequence-format file. */
825  public static class Writer implements java.io.Closeable, Syncable {
826    private Configuration conf;
827    FSDataOutputStream out;
828    boolean ownOutputStream = true;
829    DataOutputBuffer buffer = new DataOutputBuffer();
830
831    Class keyClass;
832    Class valClass;
833
834    private final CompressionType compress;
835    CompressionCodec codec = null;
836    CompressionOutputStream deflateFilter = null;
837    DataOutputStream deflateOut = null;
838    Metadata metadata = null;
839    Compressor compressor = null;
840    
841    protected Serializer keySerializer;
842    protected Serializer uncompressedValSerializer;
843    protected Serializer compressedValSerializer;
844    
845    // Insert a globally unique 16-byte value every few entries, so that one
846    // can seek into the middle of a file and then synchronize with record
847    // starts and ends by scanning for this value.
848    long lastSyncPos;                     // position of last sync
849    byte[] sync;                          // 16 random bytes
850    {
851      try {                                       
852        MessageDigest digester = MessageDigest.getInstance("MD5");
853        long time = Time.now();
854        digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8));
855        sync = digester.digest();
856      } catch (Exception e) {
857        throw new RuntimeException(e);
858      }
859    }
860
861    public static interface Option {}
862    
863    static class FileOption extends Options.PathOption 
864                                    implements Option {
865      FileOption(Path path) {
866        super(path);
867      }
868    }
869
870    /**
871     * @deprecated only used for backwards-compatibility in the createWriter methods
872     * that take FileSystem.
873     */
874    @Deprecated
875    private static class FileSystemOption implements Option {
876      private final FileSystem value;
877      protected FileSystemOption(FileSystem value) {
878        this.value = value;
879      }
880      public FileSystem getValue() {
881        return value;
882      }
883    }
884
885    static class StreamOption extends Options.FSDataOutputStreamOption 
886                              implements Option {
887      StreamOption(FSDataOutputStream stream) {
888        super(stream);
889      }
890    }
891
892    static class BufferSizeOption extends Options.IntegerOption
893                                  implements Option {
894      BufferSizeOption(int value) {
895        super(value);
896      }
897    }
898    
899    static class BlockSizeOption extends Options.LongOption implements Option {
900      BlockSizeOption(long value) {
901        super(value);
902      }
903    }
904
905    static class ReplicationOption extends Options.IntegerOption
906                                   implements Option {
907      ReplicationOption(int value) {
908        super(value);
909      }
910    }
911
912    static class KeyClassOption extends Options.ClassOption implements Option {
913      KeyClassOption(Class<?> value) {
914        super(value);
915      }
916    }
917
918    static class ValueClassOption extends Options.ClassOption
919                                          implements Option {
920      ValueClassOption(Class<?> value) {
921        super(value);
922      }
923    }
924
925    static class MetadataOption implements Option {
926      private final Metadata value;
927      MetadataOption(Metadata value) {
928        this.value = value;
929      }
930      Metadata getValue() {
931        return value;
932      }
933    }
934
935    static class ProgressableOption extends Options.ProgressableOption
936                                    implements Option {
937      ProgressableOption(Progressable value) {
938        super(value);
939      }
940    }
941
942    private static class CompressionOption implements Option {
943      private final CompressionType value;
944      private final CompressionCodec codec;
945      CompressionOption(CompressionType value) {
946        this(value, null);
947      }
948      CompressionOption(CompressionType value, CompressionCodec codec) {
949        this.value = value;
950        this.codec = (CompressionType.NONE != value && null == codec)
951          ? new DefaultCodec()
952          : codec;
953      }
954      CompressionType getValue() {
955        return value;
956      }
957      CompressionCodec getCodec() {
958        return codec;
959      }
960    }
961    
962    public static Option file(Path value) {
963      return new FileOption(value);
964    }
965
966    /**
967     * @deprecated only used for backwards-compatibility in the createWriter methods
968     * that take FileSystem.
969     */
970    @Deprecated
971    private static Option filesystem(FileSystem fs) {
972      return new SequenceFile.Writer.FileSystemOption(fs);
973    }
974    
975    public static Option bufferSize(int value) {
976      return new BufferSizeOption(value);
977    }
978    
979    public static Option stream(FSDataOutputStream value) {
980      return new StreamOption(value);
981    }
982    
983    public static Option replication(short value) {
984      return new ReplicationOption(value);
985    }
986    
987    public static Option blockSize(long value) {
988      return new BlockSizeOption(value);
989    }
990    
991    public static Option progressable(Progressable value) {
992      return new ProgressableOption(value);
993    }
994
995    public static Option keyClass(Class<?> value) {
996      return new KeyClassOption(value);
997    }
998    
999    public static Option valueClass(Class<?> value) {
1000      return new ValueClassOption(value);
1001    }
1002    
1003    public static Option metadata(Metadata value) {
1004      return new MetadataOption(value);
1005    }
1006
1007    public static Option compression(CompressionType value) {
1008      return new CompressionOption(value);
1009    }
1010
1011    public static Option compression(CompressionType value,
1012        CompressionCodec codec) {
1013      return new CompressionOption(value, codec);
1014    }
1015    
1016    /**
1017     * Construct a uncompressed writer from a set of options.
1018     * @param conf the configuration to use
1019     * @param options the options used when creating the writer
1020     * @throws IOException if it fails
1021     */
1022    Writer(Configuration conf, 
1023           Option... opts) throws IOException {
1024      BlockSizeOption blockSizeOption = 
1025        Options.getOption(BlockSizeOption.class, opts);
1026      BufferSizeOption bufferSizeOption = 
1027        Options.getOption(BufferSizeOption.class, opts);
1028      ReplicationOption replicationOption = 
1029        Options.getOption(ReplicationOption.class, opts);
1030      ProgressableOption progressOption = 
1031        Options.getOption(ProgressableOption.class, opts);
1032      FileOption fileOption = Options.getOption(FileOption.class, opts);
1033      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1034      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1035      KeyClassOption keyClassOption = 
1036        Options.getOption(KeyClassOption.class, opts);
1037      ValueClassOption valueClassOption = 
1038        Options.getOption(ValueClassOption.class, opts);
1039      MetadataOption metadataOption = 
1040        Options.getOption(MetadataOption.class, opts);
1041      CompressionOption compressionTypeOption =
1042        Options.getOption(CompressionOption.class, opts);
1043      // check consistency of options
1044      if ((fileOption == null) == (streamOption == null)) {
1045        throw new IllegalArgumentException("file or stream must be specified");
1046      }
1047      if (fileOption == null && (blockSizeOption != null ||
1048                                 bufferSizeOption != null ||
1049                                 replicationOption != null ||
1050                                 progressOption != null)) {
1051        throw new IllegalArgumentException("file modifier options not " +
1052                                           "compatible with stream");
1053      }
1054
1055      FSDataOutputStream out;
1056      boolean ownStream = fileOption != null;
1057      if (ownStream) {
1058        Path p = fileOption.getValue();
1059        FileSystem fs;
1060        if (fsOption != null) {
1061          fs = fsOption.getValue();
1062        } else {
1063          fs = p.getFileSystem(conf);
1064        }
1065        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1066          bufferSizeOption.getValue();
1067        short replication = replicationOption == null ? 
1068          fs.getDefaultReplication(p) :
1069          (short) replicationOption.getValue();
1070        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1071          blockSizeOption.getValue();
1072        Progressable progress = progressOption == null ? null :
1073          progressOption.getValue();
1074        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1075      } else {
1076        out = streamOption.getValue();
1077      }
1078      Class<?> keyClass = keyClassOption == null ?
1079          Object.class : keyClassOption.getValue();
1080      Class<?> valueClass = valueClassOption == null ?
1081          Object.class : valueClassOption.getValue();
1082      Metadata metadata = metadataOption == null ?
1083          new Metadata() : metadataOption.getValue();
1084      this.compress = compressionTypeOption.getValue();
1085      final CompressionCodec codec = compressionTypeOption.getCodec();
1086      if (codec != null &&
1087          (codec instanceof GzipCodec) &&
1088          !NativeCodeLoader.isNativeCodeLoaded() &&
1089          !ZlibFactory.isNativeZlibLoaded(conf)) {
1090        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1091                                           "GzipCodec without native-hadoop " +
1092                                           "code!");
1093      }
1094      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1095    }
1096
1097    /** Create the named file.
1098     * @deprecated Use 
1099     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1100     *   instead.
1101     */
1102    @Deprecated
1103    public Writer(FileSystem fs, Configuration conf, Path name, 
1104                  Class keyClass, Class valClass) throws IOException {
1105      this.compress = CompressionType.NONE;
1106      init(conf, fs.create(name), true, keyClass, valClass, null, 
1107           new Metadata());
1108    }
1109    
1110    /** Create the named file with write-progress reporter.
1111     * @deprecated Use 
1112     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1113     *   instead.
1114     */
1115    @Deprecated
1116    public Writer(FileSystem fs, Configuration conf, Path name, 
1117                  Class keyClass, Class valClass,
1118                  Progressable progress, Metadata metadata) throws IOException {
1119      this.compress = CompressionType.NONE;
1120      init(conf, fs.create(name, progress), true, keyClass, valClass,
1121           null, metadata);
1122    }
1123    
1124    /** Create the named file with write-progress reporter. 
1125     * @deprecated Use 
1126     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1127     *   instead.
1128     */
1129    @Deprecated
1130    public Writer(FileSystem fs, Configuration conf, Path name,
1131                  Class keyClass, Class valClass,
1132                  int bufferSize, short replication, long blockSize,
1133                  Progressable progress, Metadata metadata) throws IOException {
1134      this.compress = CompressionType.NONE;
1135      init(conf,
1136           fs.create(name, true, bufferSize, replication, blockSize, progress),
1137           true, keyClass, valClass, null, metadata);
1138    }
1139
1140    boolean isCompressed() { return compress != CompressionType.NONE; }
1141    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1142    
1143    Writer ownStream() { this.ownOutputStream = true; return this;  }
1144
1145    /** Write and flush the file header. */
1146    private void writeFileHeader() 
1147      throws IOException {
1148      out.write(VERSION);
1149      Text.writeString(out, keyClass.getName());
1150      Text.writeString(out, valClass.getName());
1151      
1152      out.writeBoolean(this.isCompressed());
1153      out.writeBoolean(this.isBlockCompressed());
1154      
1155      if (this.isCompressed()) {
1156        Text.writeString(out, (codec.getClass()).getName());
1157      }
1158      this.metadata.write(out);
1159      out.write(sync);                       // write the sync bytes
1160      out.flush();                           // flush header
1161    }
1162    
1163    /** Initialize. */
1164    @SuppressWarnings("unchecked")
1165    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1166              Class keyClass, Class valClass,
1167              CompressionCodec codec, Metadata metadata) 
1168      throws IOException {
1169      this.conf = conf;
1170      this.out = out;
1171      this.ownOutputStream = ownStream;
1172      this.keyClass = keyClass;
1173      this.valClass = valClass;
1174      this.codec = codec;
1175      this.metadata = metadata;
1176      SerializationFactory serializationFactory = new SerializationFactory(conf);
1177      this.keySerializer = serializationFactory.getSerializer(keyClass);
1178      if (this.keySerializer == null) {
1179        throw new IOException(
1180            "Could not find a serializer for the Key class: '"
1181                + keyClass.getCanonicalName() + "'. "
1182                + "Please ensure that the configuration '" +
1183                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1184                + "properly configured, if you're using"
1185                + "custom serialization.");
1186      }
1187      this.keySerializer.open(buffer);
1188      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1189      if (this.uncompressedValSerializer == null) {
1190        throw new IOException(
1191            "Could not find a serializer for the Value class: '"
1192                + valClass.getCanonicalName() + "'. "
1193                + "Please ensure that the configuration '" +
1194                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1195                + "properly configured, if you're using"
1196                + "custom serialization.");
1197      }
1198      this.uncompressedValSerializer.open(buffer);
1199      if (this.codec != null) {
1200        ReflectionUtils.setConf(this.codec, this.conf);
1201        this.compressor = CodecPool.getCompressor(this.codec);
1202        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1203        this.deflateOut = 
1204          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1205        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1206        if (this.compressedValSerializer == null) {
1207          throw new IOException(
1208              "Could not find a serializer for the Value class: '"
1209                  + valClass.getCanonicalName() + "'. "
1210                  + "Please ensure that the configuration '" +
1211                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1212                  + "properly configured, if you're using"
1213                  + "custom serialization.");
1214        }
1215        this.compressedValSerializer.open(deflateOut);
1216      }
1217      writeFileHeader();
1218    }
1219    
1220    /** Returns the class of keys in this file. */
1221    public Class getKeyClass() { return keyClass; }
1222
1223    /** Returns the class of values in this file. */
1224    public Class getValueClass() { return valClass; }
1225
1226    /** Returns the compression codec of data in this file. */
1227    public CompressionCodec getCompressionCodec() { return codec; }
1228    
1229    /** create a sync point */
1230    public void sync() throws IOException {
1231      if (sync != null && lastSyncPos != out.getPos()) {
1232        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1233        out.write(sync);                          // write sync
1234        lastSyncPos = out.getPos();               // update lastSyncPos
1235      }
1236    }
1237
1238    /**
1239     * flush all currently written data to the file system
1240     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1241     */
1242    @Deprecated
1243    public void syncFs() throws IOException {
1244      if (out != null) {
1245        out.sync();                               // flush contents to file system
1246      }
1247    }
1248
1249    @Override
1250    public void hsync() throws IOException {
1251      if (out != null) {
1252        out.hsync();
1253      }
1254    }
1255
1256    @Override
1257    public void hflush() throws IOException {
1258      if (out != null) {
1259        out.hflush();
1260      }
1261    }
1262    
1263    /** Returns the configuration of this file. */
1264    Configuration getConf() { return conf; }
1265    
1266    /** Close the file. */
1267    @Override
1268    public synchronized void close() throws IOException {
1269      keySerializer.close();
1270      uncompressedValSerializer.close();
1271      if (compressedValSerializer != null) {
1272        compressedValSerializer.close();
1273      }
1274
1275      CodecPool.returnCompressor(compressor);
1276      compressor = null;
1277      
1278      if (out != null) {
1279        
1280        // Close the underlying stream iff we own it...
1281        if (ownOutputStream) {
1282          out.close();
1283        } else {
1284          out.flush();
1285        }
1286        out = null;
1287      }
1288    }
1289
1290    synchronized void checkAndWriteSync() throws IOException {
1291      if (sync != null &&
1292          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1293        sync();
1294      }
1295    }
1296
1297    /** Append a key/value pair. */
1298    public void append(Writable key, Writable val)
1299      throws IOException {
1300      append((Object) key, (Object) val);
1301    }
1302
1303    /** Append a key/value pair. */
1304    @SuppressWarnings("unchecked")
1305    public synchronized void append(Object key, Object val)
1306      throws IOException {
1307      if (key.getClass() != keyClass)
1308        throw new IOException("wrong key class: "+key.getClass().getName()
1309                              +" is not "+keyClass);
1310      if (val.getClass() != valClass)
1311        throw new IOException("wrong value class: "+val.getClass().getName()
1312                              +" is not "+valClass);
1313
1314      buffer.reset();
1315
1316      // Append the 'key'
1317      keySerializer.serialize(key);
1318      int keyLength = buffer.getLength();
1319      if (keyLength < 0)
1320        throw new IOException("negative length keys not allowed: " + key);
1321
1322      // Append the 'value'
1323      if (compress == CompressionType.RECORD) {
1324        deflateFilter.resetState();
1325        compressedValSerializer.serialize(val);
1326        deflateOut.flush();
1327        deflateFilter.finish();
1328      } else {
1329        uncompressedValSerializer.serialize(val);
1330      }
1331
1332      // Write the record out
1333      checkAndWriteSync();                                // sync
1334      out.writeInt(buffer.getLength());                   // total record length
1335      out.writeInt(keyLength);                            // key portion length
1336      out.write(buffer.getData(), 0, buffer.getLength()); // data
1337    }
1338
1339    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1340        int keyLength, ValueBytes val) throws IOException {
1341      if (keyLength < 0)
1342        throw new IOException("negative length keys not allowed: " + keyLength);
1343
1344      int valLength = val.getSize();
1345
1346      checkAndWriteSync();
1347      
1348      out.writeInt(keyLength+valLength);          // total record length
1349      out.writeInt(keyLength);                    // key portion length
1350      out.write(keyData, keyOffset, keyLength);   // key
1351      val.writeUncompressedBytes(out);            // value
1352    }
1353
1354    /** Returns the current length of the output file.
1355     *
1356     * <p>This always returns a synchronized position.  In other words,
1357     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1358     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1359     * the key may be earlier in the file than key last written when this
1360     * method was called (e.g., with block-compression, it may be the first key
1361     * in the block that was being written when this method was called).
1362     */
1363    public synchronized long getLength() throws IOException {
1364      return out.getPos();
1365    }
1366
1367  } // class Writer
1368
1369  /** Write key/compressed-value pairs to a sequence-format file. */
1370  static class RecordCompressWriter extends Writer {
1371    
1372    RecordCompressWriter(Configuration conf, 
1373                         Option... options) throws IOException {
1374      super(conf, options);
1375    }
1376
1377    /** Append a key/value pair. */
1378    @Override
1379    @SuppressWarnings("unchecked")
1380    public synchronized void append(Object key, Object val)
1381      throws IOException {
1382      if (key.getClass() != keyClass)
1383        throw new IOException("wrong key class: "+key.getClass().getName()
1384                              +" is not "+keyClass);
1385      if (val.getClass() != valClass)
1386        throw new IOException("wrong value class: "+val.getClass().getName()
1387                              +" is not "+valClass);
1388
1389      buffer.reset();
1390
1391      // Append the 'key'
1392      keySerializer.serialize(key);
1393      int keyLength = buffer.getLength();
1394      if (keyLength < 0)
1395        throw new IOException("negative length keys not allowed: " + key);
1396
1397      // Compress 'value' and append it
1398      deflateFilter.resetState();
1399      compressedValSerializer.serialize(val);
1400      deflateOut.flush();
1401      deflateFilter.finish();
1402
1403      // Write the record out
1404      checkAndWriteSync();                                // sync
1405      out.writeInt(buffer.getLength());                   // total record length
1406      out.writeInt(keyLength);                            // key portion length
1407      out.write(buffer.getData(), 0, buffer.getLength()); // data
1408    }
1409
1410    /** Append a key/value pair. */
1411    @Override
1412    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1413        int keyLength, ValueBytes val) throws IOException {
1414
1415      if (keyLength < 0)
1416        throw new IOException("negative length keys not allowed: " + keyLength);
1417
1418      int valLength = val.getSize();
1419      
1420      checkAndWriteSync();                        // sync
1421      out.writeInt(keyLength+valLength);          // total record length
1422      out.writeInt(keyLength);                    // key portion length
1423      out.write(keyData, keyOffset, keyLength);   // 'key' data
1424      val.writeCompressedBytes(out);              // 'value' data
1425    }
1426    
1427  } // RecordCompressionWriter
1428
1429  /** Write compressed key/value blocks to a sequence-format file. */
1430  static class BlockCompressWriter extends Writer {
1431    
1432    private int noBufferedRecords = 0;
1433    
1434    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1435    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1436
1437    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1438    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1439
1440    private final int compressionBlockSize;
1441    
1442    BlockCompressWriter(Configuration conf,
1443                        Option... options) throws IOException {
1444      super(conf, options);
1445      compressionBlockSize = 
1446        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1447      keySerializer.close();
1448      keySerializer.open(keyBuffer);
1449      uncompressedValSerializer.close();
1450      uncompressedValSerializer.open(valBuffer);
1451    }
1452
1453    /** Workhorse to check and write out compressed data/lengths */
1454    private synchronized 
1455      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1456      throws IOException {
1457      deflateFilter.resetState();
1458      buffer.reset();
1459      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1460                       uncompressedDataBuffer.getLength());
1461      deflateOut.flush();
1462      deflateFilter.finish();
1463      
1464      WritableUtils.writeVInt(out, buffer.getLength());
1465      out.write(buffer.getData(), 0, buffer.getLength());
1466    }
1467    
1468    /** Compress and flush contents to dfs */
1469    @Override
1470    public synchronized void sync() throws IOException {
1471      if (noBufferedRecords > 0) {
1472        super.sync();
1473        
1474        // No. of records
1475        WritableUtils.writeVInt(out, noBufferedRecords);
1476        
1477        // Write 'keys' and lengths
1478        writeBuffer(keyLenBuffer);
1479        writeBuffer(keyBuffer);
1480        
1481        // Write 'values' and lengths
1482        writeBuffer(valLenBuffer);
1483        writeBuffer(valBuffer);
1484        
1485        // Flush the file-stream
1486        out.flush();
1487        
1488        // Reset internal states
1489        keyLenBuffer.reset();
1490        keyBuffer.reset();
1491        valLenBuffer.reset();
1492        valBuffer.reset();
1493        noBufferedRecords = 0;
1494      }
1495      
1496    }
1497    
1498    /** Close the file. */
1499    @Override
1500    public synchronized void close() throws IOException {
1501      if (out != null) {
1502        sync();
1503      }
1504      super.close();
1505    }
1506
1507    /** Append a key/value pair. */
1508    @Override
1509    @SuppressWarnings("unchecked")
1510    public synchronized void append(Object key, Object val)
1511      throws IOException {
1512      if (key.getClass() != keyClass)
1513        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1514      if (val.getClass() != valClass)
1515        throw new IOException("wrong value class: "+val+" is not "+valClass);
1516
1517      // Save key/value into respective buffers 
1518      int oldKeyLength = keyBuffer.getLength();
1519      keySerializer.serialize(key);
1520      int keyLength = keyBuffer.getLength() - oldKeyLength;
1521      if (keyLength < 0)
1522        throw new IOException("negative length keys not allowed: " + key);
1523      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1524
1525      int oldValLength = valBuffer.getLength();
1526      uncompressedValSerializer.serialize(val);
1527      int valLength = valBuffer.getLength() - oldValLength;
1528      WritableUtils.writeVInt(valLenBuffer, valLength);
1529      
1530      // Added another key/value pair
1531      ++noBufferedRecords;
1532      
1533      // Compress and flush?
1534      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1535      if (currentBlockSize >= compressionBlockSize) {
1536        sync();
1537      }
1538    }
1539    
1540    /** Append a key/value pair. */
1541    @Override
1542    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1543        int keyLength, ValueBytes val) throws IOException {
1544      
1545      if (keyLength < 0)
1546        throw new IOException("negative length keys not allowed");
1547
1548      int valLength = val.getSize();
1549      
1550      // Save key/value data in relevant buffers
1551      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1552      keyBuffer.write(keyData, keyOffset, keyLength);
1553      WritableUtils.writeVInt(valLenBuffer, valLength);
1554      val.writeUncompressedBytes(valBuffer);
1555
1556      // Added another key/value pair
1557      ++noBufferedRecords;
1558
1559      // Compress and flush?
1560      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1561      if (currentBlockSize >= compressionBlockSize) {
1562        sync();
1563      }
1564    }
1565  
1566  } // BlockCompressionWriter
1567
1568  /** Get the configured buffer size */
1569  private static int getBufferSize(Configuration conf) {
1570    return conf.getInt("io.file.buffer.size", 4096);
1571  }
1572
1573  /** Reads key/value pairs from a sequence-format file. */
1574  public static class Reader implements java.io.Closeable {
1575    private String filename;
1576    private FSDataInputStream in;
1577    private DataOutputBuffer outBuf = new DataOutputBuffer();
1578
1579    private byte version;
1580
1581    private String keyClassName;
1582    private String valClassName;
1583    private Class keyClass;
1584    private Class valClass;
1585
1586    private CompressionCodec codec = null;
1587    private Metadata metadata = null;
1588    
1589    private byte[] sync = new byte[SYNC_HASH_SIZE];
1590    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1591    private boolean syncSeen;
1592
1593    private long headerEnd;
1594    private long end;
1595    private int keyLength;
1596    private int recordLength;
1597
1598    private boolean decompress;
1599    private boolean blockCompressed;
1600    
1601    private Configuration conf;
1602
1603    private int noBufferedRecords = 0;
1604    private boolean lazyDecompress = true;
1605    private boolean valuesDecompressed = true;
1606    
1607    private int noBufferedKeys = 0;
1608    private int noBufferedValues = 0;
1609    
1610    private DataInputBuffer keyLenBuffer = null;
1611    private CompressionInputStream keyLenInFilter = null;
1612    private DataInputStream keyLenIn = null;
1613    private Decompressor keyLenDecompressor = null;
1614    private DataInputBuffer keyBuffer = null;
1615    private CompressionInputStream keyInFilter = null;
1616    private DataInputStream keyIn = null;
1617    private Decompressor keyDecompressor = null;
1618
1619    private DataInputBuffer valLenBuffer = null;
1620    private CompressionInputStream valLenInFilter = null;
1621    private DataInputStream valLenIn = null;
1622    private Decompressor valLenDecompressor = null;
1623    private DataInputBuffer valBuffer = null;
1624    private CompressionInputStream valInFilter = null;
1625    private DataInputStream valIn = null;
1626    private Decompressor valDecompressor = null;
1627    
1628    private Deserializer keyDeserializer;
1629    private Deserializer valDeserializer;
1630
1631    /**
1632     * A tag interface for all of the Reader options
1633     */
1634    public static interface Option {}
1635    
1636    /**
1637     * Create an option to specify the path name of the sequence file.
1638     * @param value the path to read
1639     * @return a new option
1640     */
1641    public static Option file(Path value) {
1642      return new FileOption(value);
1643    }
1644    
1645    /**
1646     * Create an option to specify the stream with the sequence file.
1647     * @param value the stream to read.
1648     * @return a new option
1649     */
1650    public static Option stream(FSDataInputStream value) {
1651      return new InputStreamOption(value);
1652    }
1653    
1654    /**
1655     * Create an option to specify the starting byte to read.
1656     * @param value the number of bytes to skip over
1657     * @return a new option
1658     */
1659    public static Option start(long value) {
1660      return new StartOption(value);
1661    }
1662    
1663    /**
1664     * Create an option to specify the number of bytes to read.
1665     * @param value the number of bytes to read
1666     * @return a new option
1667     */
1668    public static Option length(long value) {
1669      return new LengthOption(value);
1670    }
1671    
1672    /**
1673     * Create an option with the buffer size for reading the given pathname.
1674     * @param value the number of bytes to buffer
1675     * @return a new option
1676     */
1677    public static Option bufferSize(int value) {
1678      return new BufferSizeOption(value);
1679    }
1680
1681    private static class FileOption extends Options.PathOption 
1682                                    implements Option {
1683      private FileOption(Path value) {
1684        super(value);
1685      }
1686    }
1687    
1688    private static class InputStreamOption
1689        extends Options.FSDataInputStreamOption 
1690        implements Option {
1691      private InputStreamOption(FSDataInputStream value) {
1692        super(value);
1693      }
1694    }
1695
1696    private static class StartOption extends Options.LongOption
1697                                     implements Option {
1698      private StartOption(long value) {
1699        super(value);
1700      }
1701    }
1702
1703    private static class LengthOption extends Options.LongOption
1704                                      implements Option {
1705      private LengthOption(long value) {
1706        super(value);
1707      }
1708    }
1709
1710    private static class BufferSizeOption extends Options.IntegerOption
1711                                      implements Option {
1712      private BufferSizeOption(int value) {
1713        super(value);
1714      }
1715    }
1716
1717    // only used directly
1718    private static class OnlyHeaderOption extends Options.BooleanOption 
1719                                          implements Option {
1720      private OnlyHeaderOption() {
1721        super(true);
1722      }
1723    }
1724
1725    public Reader(Configuration conf, Option... opts) throws IOException {
1726      // Look up the options, these are null if not set
1727      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1728      InputStreamOption streamOpt = 
1729        Options.getOption(InputStreamOption.class, opts);
1730      StartOption startOpt = Options.getOption(StartOption.class, opts);
1731      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1732      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1733      OnlyHeaderOption headerOnly = 
1734        Options.getOption(OnlyHeaderOption.class, opts);
1735      // check for consistency
1736      if ((fileOpt == null) == (streamOpt == null)) {
1737        throw new 
1738          IllegalArgumentException("File or stream option must be specified");
1739      }
1740      if (fileOpt == null && bufOpt != null) {
1741        throw new IllegalArgumentException("buffer size can only be set when" +
1742                                           " a file is specified.");
1743      }
1744      // figure out the real values
1745      Path filename = null;
1746      FSDataInputStream file;
1747      final long len;
1748      if (fileOpt != null) {
1749        filename = fileOpt.getValue();
1750        FileSystem fs = filename.getFileSystem(conf);
1751        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1752        len = null == lenOpt
1753          ? fs.getFileStatus(filename).getLen()
1754          : lenOpt.getValue();
1755        file = openFile(fs, filename, bufSize, len);
1756      } else {
1757        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1758        file = streamOpt.getValue();
1759      }
1760      long start = startOpt == null ? 0 : startOpt.getValue();
1761      // really set up
1762      initialize(filename, file, start, len, conf, headerOnly != null);
1763    }
1764
1765    /**
1766     * Construct a reader by opening a file from the given file system.
1767     * @param fs The file system used to open the file.
1768     * @param file The file being read.
1769     * @param conf Configuration
1770     * @throws IOException
1771     * @deprecated Use Reader(Configuration, Option...) instead.
1772     */
1773    @Deprecated
1774    public Reader(FileSystem fs, Path file, 
1775                  Configuration conf) throws IOException {
1776      this(conf, file(file.makeQualified(fs)));
1777    }
1778
1779    /**
1780     * Construct a reader by the given input stream.
1781     * @param in An input stream.
1782     * @param buffersize unused
1783     * @param start The starting position.
1784     * @param length The length being read.
1785     * @param conf Configuration
1786     * @throws IOException
1787     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1788     */
1789    @Deprecated
1790    public Reader(FSDataInputStream in, int buffersize,
1791        long start, long length, Configuration conf) throws IOException {
1792      this(conf, stream(in), start(start), length(length));
1793    }
1794
1795    /** Common work of the constructors. */
1796    private void initialize(Path filename, FSDataInputStream in,
1797                            long start, long length, Configuration conf,
1798                            boolean tempReader) throws IOException {
1799      if (in == null) {
1800        throw new IllegalArgumentException("in == null");
1801      }
1802      this.filename = filename == null ? "<unknown>" : filename.toString();
1803      this.in = in;
1804      this.conf = conf;
1805      boolean succeeded = false;
1806      try {
1807        seek(start);
1808        this.end = this.in.getPos() + length;
1809        // if it wrapped around, use the max
1810        if (end < length) {
1811          end = Long.MAX_VALUE;
1812        }
1813        init(tempReader);
1814        succeeded = true;
1815      } finally {
1816        if (!succeeded) {
1817          IOUtils.cleanup(LOG, this.in);
1818        }
1819      }
1820    }
1821
1822    /**
1823     * Override this method to specialize the type of
1824     * {@link FSDataInputStream} returned.
1825     * @param fs The file system used to open the file.
1826     * @param file The file being read.
1827     * @param bufferSize The buffer size used to read the file.
1828     * @param length The length being read if it is >= 0.  Otherwise,
1829     *               the length is not available.
1830     * @return The opened stream.
1831     * @throws IOException
1832     */
1833    protected FSDataInputStream openFile(FileSystem fs, Path file,
1834        int bufferSize, long length) throws IOException {
1835      return fs.open(file, bufferSize);
1836    }
1837    
1838    /**
1839     * Initialize the {@link Reader}
1840     * @param tmpReader <code>true</code> if we are constructing a temporary
1841     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1842     *                  and hence do not initialize every component; 
1843     *                  <code>false</code> otherwise.
1844     * @throws IOException
1845     */
1846    private void init(boolean tempReader) throws IOException {
1847      byte[] versionBlock = new byte[VERSION.length];
1848      in.readFully(versionBlock);
1849
1850      if ((versionBlock[0] != VERSION[0]) ||
1851          (versionBlock[1] != VERSION[1]) ||
1852          (versionBlock[2] != VERSION[2]))
1853        throw new IOException(this + " not a SequenceFile");
1854
1855      // Set 'version'
1856      version = versionBlock[3];
1857      if (version > VERSION[3])
1858        throw new VersionMismatchException(VERSION[3], version);
1859
1860      if (version < BLOCK_COMPRESS_VERSION) {
1861        UTF8 className = new UTF8();
1862
1863        className.readFields(in);
1864        keyClassName = className.toStringChecked(); // key class name
1865
1866        className.readFields(in);
1867        valClassName = className.toStringChecked(); // val class name
1868      } else {
1869        keyClassName = Text.readString(in);
1870        valClassName = Text.readString(in);
1871      }
1872
1873      if (version > 2) {                          // if version > 2
1874        this.decompress = in.readBoolean();       // is compressed?
1875      } else {
1876        decompress = false;
1877      }
1878
1879      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1880        this.blockCompressed = in.readBoolean();  // is block-compressed?
1881      } else {
1882        blockCompressed = false;
1883      }
1884      
1885      // if version >= 5
1886      // setup the compression codec
1887      if (decompress) {
1888        if (version >= CUSTOM_COMPRESS_VERSION) {
1889          String codecClassname = Text.readString(in);
1890          try {
1891            Class<? extends CompressionCodec> codecClass
1892              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1893            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1894          } catch (ClassNotFoundException cnfe) {
1895            throw new IllegalArgumentException("Unknown codec: " + 
1896                                               codecClassname, cnfe);
1897          }
1898        } else {
1899          codec = new DefaultCodec();
1900          ((Configurable)codec).setConf(conf);
1901        }
1902      }
1903      
1904      this.metadata = new Metadata();
1905      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1906        this.metadata.readFields(in);
1907      }
1908      
1909      if (version > 1) {                          // if version > 1
1910        in.readFully(sync);                       // read sync bytes
1911        headerEnd = in.getPos();                  // record end of header
1912      }
1913      
1914      // Initialize... *not* if this we are constructing a temporary Reader
1915      if (!tempReader) {
1916        valBuffer = new DataInputBuffer();
1917        if (decompress) {
1918          valDecompressor = CodecPool.getDecompressor(codec);
1919          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1920          valIn = new DataInputStream(valInFilter);
1921        } else {
1922          valIn = valBuffer;
1923        }
1924
1925        if (blockCompressed) {
1926          keyLenBuffer = new DataInputBuffer();
1927          keyBuffer = new DataInputBuffer();
1928          valLenBuffer = new DataInputBuffer();
1929
1930          keyLenDecompressor = CodecPool.getDecompressor(codec);
1931          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1932                                                   keyLenDecompressor);
1933          keyLenIn = new DataInputStream(keyLenInFilter);
1934
1935          keyDecompressor = CodecPool.getDecompressor(codec);
1936          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1937          keyIn = new DataInputStream(keyInFilter);
1938
1939          valLenDecompressor = CodecPool.getDecompressor(codec);
1940          valLenInFilter = codec.createInputStream(valLenBuffer, 
1941                                                   valLenDecompressor);
1942          valLenIn = new DataInputStream(valLenInFilter);
1943        }
1944        
1945        SerializationFactory serializationFactory =
1946          new SerializationFactory(conf);
1947        this.keyDeserializer =
1948          getDeserializer(serializationFactory, getKeyClass());
1949        if (this.keyDeserializer == null) {
1950          throw new IOException(
1951              "Could not find a deserializer for the Key class: '"
1952                  + getKeyClass().getCanonicalName() + "'. "
1953                  + "Please ensure that the configuration '" +
1954                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1955                  + "properly configured, if you're using "
1956                  + "custom serialization.");
1957        }
1958        if (!blockCompressed) {
1959          this.keyDeserializer.open(valBuffer);
1960        } else {
1961          this.keyDeserializer.open(keyIn);
1962        }
1963        this.valDeserializer =
1964          getDeserializer(serializationFactory, getValueClass());
1965        if (this.valDeserializer == null) {
1966          throw new IOException(
1967              "Could not find a deserializer for the Value class: '"
1968                  + getValueClass().getCanonicalName() + "'. "
1969                  + "Please ensure that the configuration '" +
1970                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1971                  + "properly configured, if you're using "
1972                  + "custom serialization.");
1973        }
1974        this.valDeserializer.open(valIn);
1975      }
1976    }
1977    
1978    @SuppressWarnings("unchecked")
1979    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1980      return sf.getDeserializer(c);
1981    }
1982    
1983    /** Close the file. */
1984    @Override
1985    public synchronized void close() throws IOException {
1986      // Return the decompressors to the pool
1987      CodecPool.returnDecompressor(keyLenDecompressor);
1988      CodecPool.returnDecompressor(keyDecompressor);
1989      CodecPool.returnDecompressor(valLenDecompressor);
1990      CodecPool.returnDecompressor(valDecompressor);
1991      keyLenDecompressor = keyDecompressor = null;
1992      valLenDecompressor = valDecompressor = null;
1993      
1994      if (keyDeserializer != null) {
1995        keyDeserializer.close();
1996      }
1997      if (valDeserializer != null) {
1998        valDeserializer.close();
1999      }
2000      
2001      // Close the input-stream
2002      in.close();
2003    }
2004
2005    /** Returns the name of the key class. */
2006    public String getKeyClassName() {
2007      return keyClassName;
2008    }
2009
2010    /** Returns the class of keys in this file. */
2011    public synchronized Class<?> getKeyClass() {
2012      if (null == keyClass) {
2013        try {
2014          keyClass = WritableName.getClass(getKeyClassName(), conf);
2015        } catch (IOException e) {
2016          throw new RuntimeException(e);
2017        }
2018      }
2019      return keyClass;
2020    }
2021
2022    /** Returns the name of the value class. */
2023    public String getValueClassName() {
2024      return valClassName;
2025    }
2026
2027    /** Returns the class of values in this file. */
2028    public synchronized Class<?> getValueClass() {
2029      if (null == valClass) {
2030        try {
2031          valClass = WritableName.getClass(getValueClassName(), conf);
2032        } catch (IOException e) {
2033          throw new RuntimeException(e);
2034        }
2035      }
2036      return valClass;
2037    }
2038
2039    /** Returns true if values are compressed. */
2040    public boolean isCompressed() { return decompress; }
2041    
2042    /** Returns true if records are block-compressed. */
2043    public boolean isBlockCompressed() { return blockCompressed; }
2044    
2045    /** Returns the compression codec of data in this file. */
2046    public CompressionCodec getCompressionCodec() { return codec; }
2047    
2048    /**
2049     * Get the compression type for this file.
2050     * @return the compression type
2051     */
2052    public CompressionType getCompressionType() {
2053      if (decompress) {
2054        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2055      } else {
2056        return CompressionType.NONE;
2057      }
2058    }
2059
2060    /** Returns the metadata object of the file */
2061    public Metadata getMetadata() {
2062      return this.metadata;
2063    }
2064    
2065    /** Returns the configuration used for this file. */
2066    Configuration getConf() { return conf; }
2067    
2068    /** Read a compressed buffer */
2069    private synchronized void readBuffer(DataInputBuffer buffer, 
2070                                         CompressionInputStream filter) throws IOException {
2071      // Read data into a temporary buffer
2072      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2073
2074      try {
2075        int dataBufferLength = WritableUtils.readVInt(in);
2076        dataBuffer.write(in, dataBufferLength);
2077      
2078        // Set up 'buffer' connected to the input-stream
2079        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2080      } finally {
2081        dataBuffer.close();
2082      }
2083
2084      // Reset the codec
2085      filter.resetState();
2086    }
2087    
2088    /** Read the next 'compressed' block */
2089    private synchronized void readBlock() throws IOException {
2090      // Check if we need to throw away a whole block of 
2091      // 'values' due to 'lazy decompression' 
2092      if (lazyDecompress && !valuesDecompressed) {
2093        in.seek(WritableUtils.readVInt(in)+in.getPos());
2094        in.seek(WritableUtils.readVInt(in)+in.getPos());
2095      }
2096      
2097      // Reset internal states
2098      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2099      valuesDecompressed = false;
2100
2101      //Process sync
2102      if (sync != null) {
2103        in.readInt();
2104        in.readFully(syncCheck);                // read syncCheck
2105        if (!Arrays.equals(sync, syncCheck))    // check it
2106          throw new IOException("File is corrupt!");
2107      }
2108      syncSeen = true;
2109
2110      // Read number of records in this block
2111      noBufferedRecords = WritableUtils.readVInt(in);
2112      
2113      // Read key lengths and keys
2114      readBuffer(keyLenBuffer, keyLenInFilter);
2115      readBuffer(keyBuffer, keyInFilter);
2116      noBufferedKeys = noBufferedRecords;
2117      
2118      // Read value lengths and values
2119      if (!lazyDecompress) {
2120        readBuffer(valLenBuffer, valLenInFilter);
2121        readBuffer(valBuffer, valInFilter);
2122        noBufferedValues = noBufferedRecords;
2123        valuesDecompressed = true;
2124      }
2125    }
2126
2127    /** 
2128     * Position valLenIn/valIn to the 'value' 
2129     * corresponding to the 'current' key 
2130     */
2131    private synchronized void seekToCurrentValue() throws IOException {
2132      if (!blockCompressed) {
2133        if (decompress) {
2134          valInFilter.resetState();
2135        }
2136        valBuffer.reset();
2137      } else {
2138        // Check if this is the first value in the 'block' to be read
2139        if (lazyDecompress && !valuesDecompressed) {
2140          // Read the value lengths and values
2141          readBuffer(valLenBuffer, valLenInFilter);
2142          readBuffer(valBuffer, valInFilter);
2143          noBufferedValues = noBufferedRecords;
2144          valuesDecompressed = true;
2145        }
2146        
2147        // Calculate the no. of bytes to skip
2148        // Note: 'current' key has already been read!
2149        int skipValBytes = 0;
2150        int currentKey = noBufferedKeys + 1;          
2151        for (int i=noBufferedValues; i > currentKey; --i) {
2152          skipValBytes += WritableUtils.readVInt(valLenIn);
2153          --noBufferedValues;
2154        }
2155        
2156        // Skip to the 'val' corresponding to 'current' key
2157        if (skipValBytes > 0) {
2158          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2159            throw new IOException("Failed to seek to " + currentKey + 
2160                                  "(th) value!");
2161          }
2162        }
2163      }
2164    }
2165
2166    /**
2167     * Get the 'value' corresponding to the last read 'key'.
2168     * @param val : The 'value' to be read.
2169     * @throws IOException
2170     */
2171    public synchronized void getCurrentValue(Writable val) 
2172      throws IOException {
2173      if (val instanceof Configurable) {
2174        ((Configurable) val).setConf(this.conf);
2175      }
2176
2177      // Position stream to 'current' value
2178      seekToCurrentValue();
2179
2180      if (!blockCompressed) {
2181        val.readFields(valIn);
2182        
2183        if (valIn.read() > 0) {
2184          LOG.info("available bytes: " + valIn.available());
2185          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2186                                + " bytes, should read " +
2187                                (valBuffer.getLength()-keyLength));
2188        }
2189      } else {
2190        // Get the value
2191        int valLength = WritableUtils.readVInt(valLenIn);
2192        val.readFields(valIn);
2193        
2194        // Read another compressed 'value'
2195        --noBufferedValues;
2196        
2197        // Sanity check
2198        if ((valLength < 0) && LOG.isDebugEnabled()) {
2199          LOG.debug(val + " is a zero-length value");
2200        }
2201      }
2202
2203    }
2204    
2205    /**
2206     * Get the 'value' corresponding to the last read 'key'.
2207     * @param val : The 'value' to be read.
2208     * @throws IOException
2209     */
2210    public synchronized Object getCurrentValue(Object val) 
2211      throws IOException {
2212      if (val instanceof Configurable) {
2213        ((Configurable) val).setConf(this.conf);
2214      }
2215
2216      // Position stream to 'current' value
2217      seekToCurrentValue();
2218
2219      if (!blockCompressed) {
2220        val = deserializeValue(val);
2221        
2222        if (valIn.read() > 0) {
2223          LOG.info("available bytes: " + valIn.available());
2224          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2225                                + " bytes, should read " +
2226                                (valBuffer.getLength()-keyLength));
2227        }
2228      } else {
2229        // Get the value
2230        int valLength = WritableUtils.readVInt(valLenIn);
2231        val = deserializeValue(val);
2232        
2233        // Read another compressed 'value'
2234        --noBufferedValues;
2235        
2236        // Sanity check
2237        if ((valLength < 0) && LOG.isDebugEnabled()) {
2238          LOG.debug(val + " is a zero-length value");
2239        }
2240      }
2241      return val;
2242
2243    }
2244
2245    @SuppressWarnings("unchecked")
2246    private Object deserializeValue(Object val) throws IOException {
2247      return valDeserializer.deserialize(val);
2248    }
2249    
2250    /** Read the next key in the file into <code>key</code>, skipping its
2251     * value.  True if another entry exists, and false at end of file. */
2252    public synchronized boolean next(Writable key) throws IOException {
2253      if (key.getClass() != getKeyClass())
2254        throw new IOException("wrong key class: "+key.getClass().getName()
2255                              +" is not "+keyClass);
2256
2257      if (!blockCompressed) {
2258        outBuf.reset();
2259        
2260        keyLength = next(outBuf);
2261        if (keyLength < 0)
2262          return false;
2263        
2264        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2265        
2266        key.readFields(valBuffer);
2267        valBuffer.mark(0);
2268        if (valBuffer.getPosition() != keyLength)
2269          throw new IOException(key + " read " + valBuffer.getPosition()
2270                                + " bytes, should read " + keyLength);
2271      } else {
2272        //Reset syncSeen
2273        syncSeen = false;
2274        
2275        if (noBufferedKeys == 0) {
2276          try {
2277            readBlock();
2278          } catch (EOFException eof) {
2279            return false;
2280          }
2281        }
2282        
2283        int keyLength = WritableUtils.readVInt(keyLenIn);
2284        
2285        // Sanity check
2286        if (keyLength < 0) {
2287          return false;
2288        }
2289        
2290        //Read another compressed 'key'
2291        key.readFields(keyIn);
2292        --noBufferedKeys;
2293      }
2294
2295      return true;
2296    }
2297
2298    /** Read the next key/value pair in the file into <code>key</code> and
2299     * <code>val</code>.  Returns true if such a pair exists and false when at
2300     * end of file */
2301    public synchronized boolean next(Writable key, Writable val)
2302      throws IOException {
2303      if (val.getClass() != getValueClass())
2304        throw new IOException("wrong value class: "+val+" is not "+valClass);
2305
2306      boolean more = next(key);
2307      
2308      if (more) {
2309        getCurrentValue(val);
2310      }
2311
2312      return more;
2313    }
2314    
2315    /**
2316     * Read and return the next record length, potentially skipping over 
2317     * a sync block.
2318     * @return the length of the next record or -1 if there is no next record
2319     * @throws IOException
2320     */
2321    private synchronized int readRecordLength() throws IOException {
2322      if (in.getPos() >= end) {
2323        return -1;
2324      }      
2325      int length = in.readInt();
2326      if (version > 1 && sync != null &&
2327          length == SYNC_ESCAPE) {              // process a sync entry
2328        in.readFully(syncCheck);                // read syncCheck
2329        if (!Arrays.equals(sync, syncCheck))    // check it
2330          throw new IOException("File is corrupt!");
2331        syncSeen = true;
2332        if (in.getPos() >= end) {
2333          return -1;
2334        }
2335        length = in.readInt();                  // re-read length
2336      } else {
2337        syncSeen = false;
2338      }
2339      
2340      return length;
2341    }
2342    
2343    /** Read the next key/value pair in the file into <code>buffer</code>.
2344     * Returns the length of the key read, or -1 if at end of file.  The length
2345     * of the value may be computed by calling buffer.getLength() before and
2346     * after calls to this method. */
2347    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2348    @Deprecated
2349    synchronized int next(DataOutputBuffer buffer) throws IOException {
2350      // Unsupported for block-compressed sequence files
2351      if (blockCompressed) {
2352        throw new IOException("Unsupported call for block-compressed" +
2353                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2354      }
2355      try {
2356        int length = readRecordLength();
2357        if (length == -1) {
2358          return -1;
2359        }
2360        int keyLength = in.readInt();
2361        buffer.write(in, length);
2362        return keyLength;
2363      } catch (ChecksumException e) {             // checksum failure
2364        handleChecksumException(e);
2365        return next(buffer);
2366      }
2367    }
2368
2369    public ValueBytes createValueBytes() {
2370      ValueBytes val = null;
2371      if (!decompress || blockCompressed) {
2372        val = new UncompressedBytes();
2373      } else {
2374        val = new CompressedBytes(codec);
2375      }
2376      return val;
2377    }
2378
2379    /**
2380     * Read 'raw' records.
2381     * @param key - The buffer into which the key is read
2382     * @param val - The 'raw' value
2383     * @return Returns the total record length or -1 for end of file
2384     * @throws IOException
2385     */
2386    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2387      throws IOException {
2388      if (!blockCompressed) {
2389        int length = readRecordLength();
2390        if (length == -1) {
2391          return -1;
2392        }
2393        int keyLength = in.readInt();
2394        int valLength = length - keyLength;
2395        key.write(in, keyLength);
2396        if (decompress) {
2397          CompressedBytes value = (CompressedBytes)val;
2398          value.reset(in, valLength);
2399        } else {
2400          UncompressedBytes value = (UncompressedBytes)val;
2401          value.reset(in, valLength);
2402        }
2403        
2404        return length;
2405      } else {
2406        //Reset syncSeen
2407        syncSeen = false;
2408        
2409        // Read 'key'
2410        if (noBufferedKeys == 0) {
2411          if (in.getPos() >= end) 
2412            return -1;
2413
2414          try { 
2415            readBlock();
2416          } catch (EOFException eof) {
2417            return -1;
2418          }
2419        }
2420        int keyLength = WritableUtils.readVInt(keyLenIn);
2421        if (keyLength < 0) {
2422          throw new IOException("zero length key found!");
2423        }
2424        key.write(keyIn, keyLength);
2425        --noBufferedKeys;
2426        
2427        // Read raw 'value'
2428        seekToCurrentValue();
2429        int valLength = WritableUtils.readVInt(valLenIn);
2430        UncompressedBytes rawValue = (UncompressedBytes)val;
2431        rawValue.reset(valIn, valLength);
2432        --noBufferedValues;
2433        
2434        return (keyLength+valLength);
2435      }
2436      
2437    }
2438
2439    /**
2440     * Read 'raw' keys.
2441     * @param key - The buffer into which the key is read
2442     * @return Returns the key length or -1 for end of file
2443     * @throws IOException
2444     */
2445    public synchronized int nextRawKey(DataOutputBuffer key) 
2446      throws IOException {
2447      if (!blockCompressed) {
2448        recordLength = readRecordLength();
2449        if (recordLength == -1) {
2450          return -1;
2451        }
2452        keyLength = in.readInt();
2453        key.write(in, keyLength);
2454        return keyLength;
2455      } else {
2456        //Reset syncSeen
2457        syncSeen = false;
2458        
2459        // Read 'key'
2460        if (noBufferedKeys == 0) {
2461          if (in.getPos() >= end) 
2462            return -1;
2463
2464          try { 
2465            readBlock();
2466          } catch (EOFException eof) {
2467            return -1;
2468          }
2469        }
2470        int keyLength = WritableUtils.readVInt(keyLenIn);
2471        if (keyLength < 0) {
2472          throw new IOException("zero length key found!");
2473        }
2474        key.write(keyIn, keyLength);
2475        --noBufferedKeys;
2476        
2477        return keyLength;
2478      }
2479      
2480    }
2481
2482    /** Read the next key in the file, skipping its
2483     * value.  Return null at end of file. */
2484    public synchronized Object next(Object key) throws IOException {
2485      if (key != null && key.getClass() != getKeyClass()) {
2486        throw new IOException("wrong key class: "+key.getClass().getName()
2487                              +" is not "+keyClass);
2488      }
2489
2490      if (!blockCompressed) {
2491        outBuf.reset();
2492        
2493        keyLength = next(outBuf);
2494        if (keyLength < 0)
2495          return null;
2496        
2497        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2498        
2499        key = deserializeKey(key);
2500        valBuffer.mark(0);
2501        if (valBuffer.getPosition() != keyLength)
2502          throw new IOException(key + " read " + valBuffer.getPosition()
2503                                + " bytes, should read " + keyLength);
2504      } else {
2505        //Reset syncSeen
2506        syncSeen = false;
2507        
2508        if (noBufferedKeys == 0) {
2509          try {
2510            readBlock();
2511          } catch (EOFException eof) {
2512            return null;
2513          }
2514        }
2515        
2516        int keyLength = WritableUtils.readVInt(keyLenIn);
2517        
2518        // Sanity check
2519        if (keyLength < 0) {
2520          return null;
2521        }
2522        
2523        //Read another compressed 'key'
2524        key = deserializeKey(key);
2525        --noBufferedKeys;
2526      }
2527
2528      return key;
2529    }
2530
2531    @SuppressWarnings("unchecked")
2532    private Object deserializeKey(Object key) throws IOException {
2533      return keyDeserializer.deserialize(key);
2534    }
2535
2536    /**
2537     * Read 'raw' values.
2538     * @param val - The 'raw' value
2539     * @return Returns the value length
2540     * @throws IOException
2541     */
2542    public synchronized int nextRawValue(ValueBytes val) 
2543      throws IOException {
2544      
2545      // Position stream to current value
2546      seekToCurrentValue();
2547 
2548      if (!blockCompressed) {
2549        int valLength = recordLength - keyLength;
2550        if (decompress) {
2551          CompressedBytes value = (CompressedBytes)val;
2552          value.reset(in, valLength);
2553        } else {
2554          UncompressedBytes value = (UncompressedBytes)val;
2555          value.reset(in, valLength);
2556        }
2557         
2558        return valLength;
2559      } else {
2560        int valLength = WritableUtils.readVInt(valLenIn);
2561        UncompressedBytes rawValue = (UncompressedBytes)val;
2562        rawValue.reset(valIn, valLength);
2563        --noBufferedValues;
2564        return valLength;
2565      }
2566      
2567    }
2568
2569    private void handleChecksumException(ChecksumException e)
2570      throws IOException {
2571      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2572        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2573        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2574      } else {
2575        throw e;
2576      }
2577    }
2578
2579    /** disables sync. often invoked for tmp files */
2580    synchronized void ignoreSync() {
2581      sync = null;
2582    }
2583    
2584    /** Set the current byte position in the input file.
2585     *
2586     * <p>The position passed must be a position returned by {@link
2587     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2588     * position, use {@link SequenceFile.Reader#sync(long)}.
2589     */
2590    public synchronized void seek(long position) throws IOException {
2591      in.seek(position);
2592      if (blockCompressed) {                      // trigger block read
2593        noBufferedKeys = 0;
2594        valuesDecompressed = true;
2595      }
2596    }
2597
2598    /** Seek to the next sync mark past a given position.*/
2599    public synchronized void sync(long position) throws IOException {
2600      if (position+SYNC_SIZE >= end) {
2601        seek(end);
2602        return;
2603      }
2604
2605      if (position < headerEnd) {
2606        // seek directly to first record
2607        in.seek(headerEnd);
2608        // note the sync marker "seen" in the header
2609        syncSeen = true;
2610        return;
2611      }
2612
2613      try {
2614        seek(position+4);                         // skip escape
2615        in.readFully(syncCheck);
2616        int syncLen = sync.length;
2617        for (int i = 0; in.getPos() < end; i++) {
2618          int j = 0;
2619          for (; j < syncLen; j++) {
2620            if (sync[j] != syncCheck[(i+j)%syncLen])
2621              break;
2622          }
2623          if (j == syncLen) {
2624            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2625            return;
2626          }
2627          syncCheck[i%syncLen] = in.readByte();
2628        }
2629      } catch (ChecksumException e) {             // checksum failure
2630        handleChecksumException(e);
2631      }
2632    }
2633
2634    /** Returns true iff the previous call to next passed a sync mark.*/
2635    public synchronized boolean syncSeen() { return syncSeen; }
2636
2637    /** Return the current byte position in the input file. */
2638    public synchronized long getPosition() throws IOException {
2639      return in.getPos();
2640    }
2641
2642    /** Returns the name of the file. */
2643    @Override
2644    public String toString() {
2645      return filename;
2646    }
2647
2648  }
2649
2650  /** Sorts key/value pairs in a sequence-format file.
2651   *
2652   * <p>For best performance, applications should make sure that the {@link
2653   * Writable#readFields(DataInput)} implementation of their keys is
2654   * very efficient.  In particular, it should avoid allocating memory.
2655   */
2656  public static class Sorter {
2657
2658    private RawComparator comparator;
2659
2660    private MergeSort mergeSort; //the implementation of merge sort
2661    
2662    private Path[] inFiles;                     // when merging or sorting
2663
2664    private Path outFile;
2665
2666    private int memory; // bytes
2667    private int factor; // merged per pass
2668
2669    private FileSystem fs = null;
2670
2671    private Class keyClass;
2672    private Class valClass;
2673
2674    private Configuration conf;
2675    private Metadata metadata;
2676    
2677    private Progressable progressable = null;
2678
2679    /** Sort and merge files containing the named classes. */
2680    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2681                  Class valClass, Configuration conf)  {
2682      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2683    }
2684
2685    /** Sort and merge using an arbitrary {@link RawComparator}. */
2686    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2687                  Class valClass, Configuration conf) {
2688      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2689    }
2690
2691    /** Sort and merge using an arbitrary {@link RawComparator}. */
2692    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2693                  Class valClass, Configuration conf, Metadata metadata) {
2694      this.fs = fs;
2695      this.comparator = comparator;
2696      this.keyClass = keyClass;
2697      this.valClass = valClass;
2698      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2699      this.factor = conf.getInt("io.sort.factor", 100);
2700      this.conf = conf;
2701      this.metadata = metadata;
2702    }
2703
2704    /** Set the number of streams to merge at once.*/
2705    public void setFactor(int factor) { this.factor = factor; }
2706
2707    /** Get the number of streams to merge at once.*/
2708    public int getFactor() { return factor; }
2709
2710    /** Set the total amount of buffer memory, in bytes.*/
2711    public void setMemory(int memory) { this.memory = memory; }
2712
2713    /** Get the total amount of buffer memory, in bytes.*/
2714    public int getMemory() { return memory; }
2715
2716    /** Set the progressable object in order to report progress. */
2717    public void setProgressable(Progressable progressable) {
2718      this.progressable = progressable;
2719    }
2720    
2721    /** 
2722     * Perform a file sort from a set of input files into an output file.
2723     * @param inFiles the files to be sorted
2724     * @param outFile the sorted output file
2725     * @param deleteInput should the input files be deleted as they are read?
2726     */
2727    public void sort(Path[] inFiles, Path outFile,
2728                     boolean deleteInput) throws IOException {
2729      if (fs.exists(outFile)) {
2730        throw new IOException("already exists: " + outFile);
2731      }
2732
2733      this.inFiles = inFiles;
2734      this.outFile = outFile;
2735
2736      int segments = sortPass(deleteInput);
2737      if (segments > 1) {
2738        mergePass(outFile.getParent());
2739      }
2740    }
2741
2742    /** 
2743     * Perform a file sort from a set of input files and return an iterator.
2744     * @param inFiles the files to be sorted
2745     * @param tempDir the directory where temp files are created during sort
2746     * @param deleteInput should the input files be deleted as they are read?
2747     * @return iterator the RawKeyValueIterator
2748     */
2749    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2750                                              boolean deleteInput) throws IOException {
2751      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2752      if (fs.exists(outFile)) {
2753        throw new IOException("already exists: " + outFile);
2754      }
2755      this.inFiles = inFiles;
2756      //outFile will basically be used as prefix for temp files in the cases
2757      //where sort outputs multiple sorted segments. For the single segment
2758      //case, the outputFile itself will contain the sorted data for that
2759      //segment
2760      this.outFile = outFile;
2761
2762      int segments = sortPass(deleteInput);
2763      if (segments > 1)
2764        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2765                     tempDir);
2766      else if (segments == 1)
2767        return merge(new Path[]{outFile}, true, tempDir);
2768      else return null;
2769    }
2770
2771    /**
2772     * The backwards compatible interface to sort.
2773     * @param inFile the input file to sort
2774     * @param outFile the sorted output file
2775     */
2776    public void sort(Path inFile, Path outFile) throws IOException {
2777      sort(new Path[]{inFile}, outFile, false);
2778    }
2779    
2780    private int sortPass(boolean deleteInput) throws IOException {
2781      if(LOG.isDebugEnabled()) {
2782        LOG.debug("running sort pass");
2783      }
2784      SortPass sortPass = new SortPass();         // make the SortPass
2785      sortPass.setProgressable(progressable);
2786      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2787      try {
2788        return sortPass.run(deleteInput);         // run it
2789      } finally {
2790        sortPass.close();                         // close it
2791      }
2792    }
2793
2794    private class SortPass {
2795      private int memoryLimit = memory/4;
2796      private int recordLimit = 1000000;
2797      
2798      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2799      private byte[] rawBuffer;
2800
2801      private int[] keyOffsets = new int[1024];
2802      private int[] pointers = new int[keyOffsets.length];
2803      private int[] pointersCopy = new int[keyOffsets.length];
2804      private int[] keyLengths = new int[keyOffsets.length];
2805      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2806      
2807      private ArrayList segmentLengths = new ArrayList();
2808      
2809      private Reader in = null;
2810      private FSDataOutputStream out = null;
2811      private FSDataOutputStream indexOut = null;
2812      private Path outName;
2813
2814      private Progressable progressable = null;
2815
2816      public int run(boolean deleteInput) throws IOException {
2817        int segments = 0;
2818        int currentFile = 0;
2819        boolean atEof = (currentFile >= inFiles.length);
2820        CompressionType compressionType;
2821        CompressionCodec codec = null;
2822        segmentLengths.clear();
2823        if (atEof) {
2824          return 0;
2825        }
2826        
2827        // Initialize
2828        in = new Reader(fs, inFiles[currentFile], conf);
2829        compressionType = in.getCompressionType();
2830        codec = in.getCompressionCodec();
2831        
2832        for (int i=0; i < rawValues.length; ++i) {
2833          rawValues[i] = null;
2834        }
2835        
2836        while (!atEof) {
2837          int count = 0;
2838          int bytesProcessed = 0;
2839          rawKeys.reset();
2840          while (!atEof && 
2841                 bytesProcessed < memoryLimit && count < recordLimit) {
2842
2843            // Read a record into buffer
2844            // Note: Attempt to re-use 'rawValue' as far as possible
2845            int keyOffset = rawKeys.getLength();       
2846            ValueBytes rawValue = 
2847              (count == keyOffsets.length || rawValues[count] == null) ? 
2848              in.createValueBytes() : 
2849              rawValues[count];
2850            int recordLength = in.nextRaw(rawKeys, rawValue);
2851            if (recordLength == -1) {
2852              in.close();
2853              if (deleteInput) {
2854                fs.delete(inFiles[currentFile], true);
2855              }
2856              currentFile += 1;
2857              atEof = currentFile >= inFiles.length;
2858              if (!atEof) {
2859                in = new Reader(fs, inFiles[currentFile], conf);
2860              } else {
2861                in = null;
2862              }
2863              continue;
2864            }
2865
2866            int keyLength = rawKeys.getLength() - keyOffset;
2867
2868            if (count == keyOffsets.length)
2869              grow();
2870
2871            keyOffsets[count] = keyOffset;                // update pointers
2872            pointers[count] = count;
2873            keyLengths[count] = keyLength;
2874            rawValues[count] = rawValue;
2875
2876            bytesProcessed += recordLength; 
2877            count++;
2878          }
2879
2880          // buffer is full -- sort & flush it
2881          if(LOG.isDebugEnabled()) {
2882            LOG.debug("flushing segment " + segments);
2883          }
2884          rawBuffer = rawKeys.getData();
2885          sort(count);
2886          // indicate we're making progress
2887          if (progressable != null) {
2888            progressable.progress();
2889          }
2890          flush(count, bytesProcessed, compressionType, codec, 
2891                segments==0 && atEof);
2892          segments++;
2893        }
2894        return segments;
2895      }
2896
2897      public void close() throws IOException {
2898        if (in != null) {
2899          in.close();
2900        }
2901        if (out != null) {
2902          out.close();
2903        }
2904        if (indexOut != null) {
2905          indexOut.close();
2906        }
2907      }
2908
2909      private void grow() {
2910        int newLength = keyOffsets.length * 3 / 2;
2911        keyOffsets = grow(keyOffsets, newLength);
2912        pointers = grow(pointers, newLength);
2913        pointersCopy = new int[newLength];
2914        keyLengths = grow(keyLengths, newLength);
2915        rawValues = grow(rawValues, newLength);
2916      }
2917
2918      private int[] grow(int[] old, int newLength) {
2919        int[] result = new int[newLength];
2920        System.arraycopy(old, 0, result, 0, old.length);
2921        return result;
2922      }
2923      
2924      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2925        ValueBytes[] result = new ValueBytes[newLength];
2926        System.arraycopy(old, 0, result, 0, old.length);
2927        for (int i=old.length; i < newLength; ++i) {
2928          result[i] = null;
2929        }
2930        return result;
2931      }
2932
2933      private void flush(int count, int bytesProcessed, 
2934                         CompressionType compressionType, 
2935                         CompressionCodec codec, 
2936                         boolean done) throws IOException {
2937        if (out == null) {
2938          outName = done ? outFile : outFile.suffix(".0");
2939          out = fs.create(outName);
2940          if (!done) {
2941            indexOut = fs.create(outName.suffix(".index"));
2942          }
2943        }
2944
2945        long segmentStart = out.getPos();
2946        Writer writer = createWriter(conf, Writer.stream(out), 
2947            Writer.keyClass(keyClass), Writer.valueClass(valClass),
2948            Writer.compression(compressionType, codec),
2949            Writer.metadata(done ? metadata : new Metadata()));
2950        
2951        if (!done) {
2952          writer.sync = null;                     // disable sync on temp files
2953        }
2954
2955        for (int i = 0; i < count; i++) {         // write in sorted order
2956          int p = pointers[i];
2957          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2958        }
2959        writer.close();
2960        
2961        if (!done) {
2962          // Save the segment length
2963          WritableUtils.writeVLong(indexOut, segmentStart);
2964          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2965          indexOut.flush();
2966        }
2967      }
2968
2969      private void sort(int count) {
2970        System.arraycopy(pointers, 0, pointersCopy, 0, count);
2971        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2972      }
2973      class SeqFileComparator implements Comparator<IntWritable> {
2974        @Override
2975        public int compare(IntWritable I, IntWritable J) {
2976          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2977                                    keyLengths[I.get()], rawBuffer, 
2978                                    keyOffsets[J.get()], keyLengths[J.get()]);
2979        }
2980      }
2981      
2982      /** set the progressable object in order to report progress */
2983      public void setProgressable(Progressable progressable)
2984      {
2985        this.progressable = progressable;
2986      }
2987      
2988    } // SequenceFile.Sorter.SortPass
2989
2990    /** The interface to iterate over raw keys/values of SequenceFiles. */
2991    public static interface RawKeyValueIterator {
2992      /** Gets the current raw key
2993       * @return DataOutputBuffer
2994       * @throws IOException
2995       */
2996      DataOutputBuffer getKey() throws IOException; 
2997      /** Gets the current raw value
2998       * @return ValueBytes 
2999       * @throws IOException
3000       */
3001      ValueBytes getValue() throws IOException; 
3002      /** Sets up the current key and value (for getKey and getValue)
3003       * @return true if there exists a key/value, false otherwise 
3004       * @throws IOException
3005       */
3006      boolean next() throws IOException;
3007      /** closes the iterator so that the underlying streams can be closed
3008       * @throws IOException
3009       */
3010      void close() throws IOException;
3011      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3012       * indicating the bytes processed by the iterator so far
3013       */
3014      Progress getProgress();
3015    }    
3016    
3017    /**
3018     * Merges the list of segments of type <code>SegmentDescriptor</code>
3019     * @param segments the list of SegmentDescriptors
3020     * @param tmpDir the directory to write temporary files into
3021     * @return RawKeyValueIterator
3022     * @throws IOException
3023     */
3024    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3025                                     Path tmpDir) 
3026      throws IOException {
3027      // pass in object to report progress, if present
3028      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3029      return mQueue.merge();
3030    }
3031
3032    /**
3033     * Merges the contents of files passed in Path[] using a max factor value
3034     * that is already set
3035     * @param inNames the array of path names
3036     * @param deleteInputs true if the input files should be deleted when 
3037     * unnecessary
3038     * @param tmpDir the directory to write temporary files into
3039     * @return RawKeyValueIteratorMergeQueue
3040     * @throws IOException
3041     */
3042    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3043                                     Path tmpDir) 
3044      throws IOException {
3045      return merge(inNames, deleteInputs, 
3046                   (inNames.length < factor) ? inNames.length : factor,
3047                   tmpDir);
3048    }
3049
3050    /**
3051     * Merges the contents of files passed in Path[]
3052     * @param inNames the array of path names
3053     * @param deleteInputs true if the input files should be deleted when 
3054     * unnecessary
3055     * @param factor the factor that will be used as the maximum merge fan-in
3056     * @param tmpDir the directory to write temporary files into
3057     * @return RawKeyValueIteratorMergeQueue
3058     * @throws IOException
3059     */
3060    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3061                                     int factor, Path tmpDir) 
3062      throws IOException {
3063      //get the segments from inNames
3064      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3065      for (int i = 0; i < inNames.length; i++) {
3066        SegmentDescriptor s = new SegmentDescriptor(0,
3067            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3068        s.preserveInput(!deleteInputs);
3069        s.doSync();
3070        a.add(s);
3071      }
3072      this.factor = factor;
3073      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3074      return mQueue.merge();
3075    }
3076
3077    /**
3078     * Merges the contents of files passed in Path[]
3079     * @param inNames the array of path names
3080     * @param tempDir the directory for creating temp files during merge
3081     * @param deleteInputs true if the input files should be deleted when 
3082     * unnecessary
3083     * @return RawKeyValueIteratorMergeQueue
3084     * @throws IOException
3085     */
3086    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3087                                     boolean deleteInputs) 
3088      throws IOException {
3089      //outFile will basically be used as prefix for temp files for the
3090      //intermediate merge outputs           
3091      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3092      //get the segments from inNames
3093      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3094      for (int i = 0; i < inNames.length; i++) {
3095        SegmentDescriptor s = new SegmentDescriptor(0,
3096            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3097        s.preserveInput(!deleteInputs);
3098        s.doSync();
3099        a.add(s);
3100      }
3101      factor = (inNames.length < factor) ? inNames.length : factor;
3102      // pass in object to report progress, if present
3103      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3104      return mQueue.merge();
3105    }
3106
3107    /**
3108     * Clones the attributes (like compression of the input file and creates a 
3109     * corresponding Writer
3110     * @param inputFile the path of the input file whose attributes should be 
3111     * cloned
3112     * @param outputFile the path of the output file 
3113     * @param prog the Progressable to report status during the file write
3114     * @return Writer
3115     * @throws IOException
3116     */
3117    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3118                                      Progressable prog) throws IOException {
3119      Reader reader = new Reader(conf,
3120                                 Reader.file(inputFile),
3121                                 new Reader.OnlyHeaderOption());
3122      CompressionType compress = reader.getCompressionType();
3123      CompressionCodec codec = reader.getCompressionCodec();
3124      reader.close();
3125
3126      Writer writer = createWriter(conf, 
3127                                   Writer.file(outputFile), 
3128                                   Writer.keyClass(keyClass), 
3129                                   Writer.valueClass(valClass), 
3130                                   Writer.compression(compress, codec), 
3131                                   Writer.progressable(prog));
3132      return writer;
3133    }
3134
3135    /**
3136     * Writes records from RawKeyValueIterator into a file represented by the 
3137     * passed writer
3138     * @param records the RawKeyValueIterator
3139     * @param writer the Writer created earlier 
3140     * @throws IOException
3141     */
3142    public void writeFile(RawKeyValueIterator records, Writer writer) 
3143      throws IOException {
3144      while(records.next()) {
3145        writer.appendRaw(records.getKey().getData(), 0, 
3146                         records.getKey().getLength(), records.getValue());
3147      }
3148      writer.sync();
3149    }
3150        
3151    /** Merge the provided files.
3152     * @param inFiles the array of input path names
3153     * @param outFile the final output file
3154     * @throws IOException
3155     */
3156    public void merge(Path[] inFiles, Path outFile) throws IOException {
3157      if (fs.exists(outFile)) {
3158        throw new IOException("already exists: " + outFile);
3159      }
3160      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3161      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3162      
3163      writeFile(r, writer);
3164
3165      writer.close();
3166    }
3167
3168    /** sort calls this to generate the final merged output */
3169    private int mergePass(Path tmpDir) throws IOException {
3170      if(LOG.isDebugEnabled()) {
3171        LOG.debug("running merge pass");
3172      }
3173      Writer writer = cloneFileAttributes(
3174                                          outFile.suffix(".0"), outFile, null);
3175      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3176                                    outFile.suffix(".0.index"), tmpDir);
3177      writeFile(r, writer);
3178
3179      writer.close();
3180      return 0;
3181    }
3182
3183    /** Used by mergePass to merge the output of the sort
3184     * @param inName the name of the input file containing sorted segments
3185     * @param indexIn the offsets of the sorted segments
3186     * @param tmpDir the relative directory to store intermediate results in
3187     * @return RawKeyValueIterator
3188     * @throws IOException
3189     */
3190    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3191      throws IOException {
3192      //get the segments from indexIn
3193      //we create a SegmentContainer so that we can track segments belonging to
3194      //inName and delete inName as soon as we see that we have looked at all
3195      //the contained segments during the merge process & hence don't need 
3196      //them anymore
3197      SegmentContainer container = new SegmentContainer(inName, indexIn);
3198      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3199      return mQueue.merge();
3200    }
3201    
3202    /** This class implements the core of the merge logic */
3203    private class MergeQueue extends PriorityQueue 
3204      implements RawKeyValueIterator {
3205      private boolean compress;
3206      private boolean blockCompress;
3207      private DataOutputBuffer rawKey = new DataOutputBuffer();
3208      private ValueBytes rawValue;
3209      private long totalBytesProcessed;
3210      private float progPerByte;
3211      private Progress mergeProgress = new Progress();
3212      private Path tmpDir;
3213      private Progressable progress = null; //handle to the progress reporting object
3214      private SegmentDescriptor minSegment;
3215      
3216      //a TreeMap used to store the segments sorted by size (segment offset and
3217      //segment path name is used to break ties between segments of same sizes)
3218      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3219        new TreeMap<SegmentDescriptor, Void>();
3220            
3221      @SuppressWarnings("unchecked")
3222      public void put(SegmentDescriptor stream) throws IOException {
3223        if (size() == 0) {
3224          compress = stream.in.isCompressed();
3225          blockCompress = stream.in.isBlockCompressed();
3226        } else if (compress != stream.in.isCompressed() || 
3227                   blockCompress != stream.in.isBlockCompressed()) {
3228          throw new IOException("All merged files must be compressed or not.");
3229        } 
3230        super.put(stream);
3231      }
3232      
3233      /**
3234       * A queue of file segments to merge
3235       * @param segments the file segments to merge
3236       * @param tmpDir a relative local directory to save intermediate files in
3237       * @param progress the reference to the Progressable object
3238       */
3239      public MergeQueue(List <SegmentDescriptor> segments,
3240          Path tmpDir, Progressable progress) {
3241        int size = segments.size();
3242        for (int i = 0; i < size; i++) {
3243          sortedSegmentSizes.put(segments.get(i), null);
3244        }
3245        this.tmpDir = tmpDir;
3246        this.progress = progress;
3247      }
3248      @Override
3249      protected boolean lessThan(Object a, Object b) {
3250        // indicate we're making progress
3251        if (progress != null) {
3252          progress.progress();
3253        }
3254        SegmentDescriptor msa = (SegmentDescriptor)a;
3255        SegmentDescriptor msb = (SegmentDescriptor)b;
3256        return comparator.compare(msa.getKey().getData(), 0, 
3257                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3258                                  msb.getKey().getLength()) < 0;
3259      }
3260      @Override
3261      public void close() throws IOException {
3262        SegmentDescriptor ms;                           // close inputs
3263        while ((ms = (SegmentDescriptor)pop()) != null) {
3264          ms.cleanup();
3265        }
3266        minSegment = null;
3267      }
3268      @Override
3269      public DataOutputBuffer getKey() throws IOException {
3270        return rawKey;
3271      }
3272      @Override
3273      public ValueBytes getValue() throws IOException {
3274        return rawValue;
3275      }
3276      @Override
3277      public boolean next() throws IOException {
3278        if (size() == 0)
3279          return false;
3280        if (minSegment != null) {
3281          //minSegment is non-null for all invocations of next except the first
3282          //one. For the first invocation, the priority queue is ready for use
3283          //but for the subsequent invocations, first adjust the queue 
3284          adjustPriorityQueue(minSegment);
3285          if (size() == 0) {
3286            minSegment = null;
3287            return false;
3288          }
3289        }
3290        minSegment = (SegmentDescriptor)top();
3291        long startPos = minSegment.in.getPosition(); // Current position in stream
3292        //save the raw key reference
3293        rawKey = minSegment.getKey();
3294        //load the raw value. Re-use the existing rawValue buffer
3295        if (rawValue == null) {
3296          rawValue = minSegment.in.createValueBytes();
3297        }
3298        minSegment.nextRawValue(rawValue);
3299        long endPos = minSegment.in.getPosition(); // End position after reading value
3300        updateProgress(endPos - startPos);
3301        return true;
3302      }
3303      
3304      @Override
3305      public Progress getProgress() {
3306        return mergeProgress; 
3307      }
3308
3309      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3310        long startPos = ms.in.getPosition(); // Current position in stream
3311        boolean hasNext = ms.nextRawKey();
3312        long endPos = ms.in.getPosition(); // End position after reading key
3313        updateProgress(endPos - startPos);
3314        if (hasNext) {
3315          adjustTop();
3316        } else {
3317          pop();
3318          ms.cleanup();
3319        }
3320      }
3321
3322      private void updateProgress(long bytesProcessed) {
3323        totalBytesProcessed += bytesProcessed;
3324        if (progPerByte > 0) {
3325          mergeProgress.set(totalBytesProcessed * progPerByte);
3326        }
3327      }
3328      
3329      /** This is the single level merge that is called multiple times 
3330       * depending on the factor size and the number of segments
3331       * @return RawKeyValueIterator
3332       * @throws IOException
3333       */
3334      public RawKeyValueIterator merge() throws IOException {
3335        //create the MergeStreams from the sorted map created in the constructor
3336        //and dump the final output to a file
3337        int numSegments = sortedSegmentSizes.size();
3338        int origFactor = factor;
3339        int passNo = 1;
3340        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3341        do {
3342          //get the factor for this pass of merge
3343          factor = getPassFactor(passNo, numSegments);
3344          List<SegmentDescriptor> segmentsToMerge =
3345            new ArrayList<SegmentDescriptor>();
3346          int segmentsConsidered = 0;
3347          int numSegmentsToConsider = factor;
3348          while (true) {
3349            //extract the smallest 'factor' number of segment pointers from the 
3350            //TreeMap. Call cleanup on the empty segments (no key/value data)
3351            SegmentDescriptor[] mStream = 
3352              getSegmentDescriptors(numSegmentsToConsider);
3353            for (int i = 0; i < mStream.length; i++) {
3354              if (mStream[i].nextRawKey()) {
3355                segmentsToMerge.add(mStream[i]);
3356                segmentsConsidered++;
3357                // Count the fact that we read some bytes in calling nextRawKey()
3358                updateProgress(mStream[i].in.getPosition());
3359              }
3360              else {
3361                mStream[i].cleanup();
3362                numSegments--; //we ignore this segment for the merge
3363              }
3364            }
3365            //if we have the desired number of segments
3366            //or looked at all available segments, we break
3367            if (segmentsConsidered == factor || 
3368                sortedSegmentSizes.size() == 0) {
3369              break;
3370            }
3371              
3372            numSegmentsToConsider = factor - segmentsConsidered;
3373          }
3374          //feed the streams to the priority queue
3375          initialize(segmentsToMerge.size()); clear();
3376          for (int i = 0; i < segmentsToMerge.size(); i++) {
3377            put(segmentsToMerge.get(i));
3378          }
3379          //if we have lesser number of segments remaining, then just return the
3380          //iterator, else do another single level merge
3381          if (numSegments <= factor) {
3382            //calculate the length of the remaining segments. Required for 
3383            //calculating the merge progress
3384            long totalBytes = 0;
3385            for (int i = 0; i < segmentsToMerge.size(); i++) {
3386              totalBytes += segmentsToMerge.get(i).segmentLength;
3387            }
3388            if (totalBytes != 0) //being paranoid
3389              progPerByte = 1.0f / (float)totalBytes;
3390            //reset factor to what it originally was
3391            factor = origFactor;
3392            return this;
3393          } else {
3394            //we want to spread the creation of temp files on multiple disks if 
3395            //available under the space constraints
3396            long approxOutputSize = 0; 
3397            for (SegmentDescriptor s : segmentsToMerge) {
3398              approxOutputSize += s.segmentLength + 
3399                                  ChecksumFileSystem.getApproxChkSumLength(
3400                                  s.segmentLength);
3401            }
3402            Path tmpFilename = 
3403              new Path(tmpDir, "intermediate").suffix("." + passNo);
3404
3405            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3406                                                tmpFilename.toString(),
3407                                                approxOutputSize, conf);
3408            if(LOG.isDebugEnabled()) { 
3409              LOG.debug("writing intermediate results to " + outputFile);
3410            }
3411            Writer writer = cloneFileAttributes(
3412                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3413                                                fs.makeQualified(outputFile), null);
3414            writer.sync = null; //disable sync for temp files
3415            writeFile(this, writer);
3416            writer.close();
3417            
3418            //we finished one single level merge; now clean up the priority 
3419            //queue
3420            this.close();
3421            
3422            SegmentDescriptor tempSegment = 
3423              new SegmentDescriptor(0,
3424                  fs.getFileStatus(outputFile).getLen(), outputFile);
3425            //put the segment back in the TreeMap
3426            sortedSegmentSizes.put(tempSegment, null);
3427            numSegments = sortedSegmentSizes.size();
3428            passNo++;
3429          }
3430          //we are worried about only the first pass merge factor. So reset the 
3431          //factor to what it originally was
3432          factor = origFactor;
3433        } while(true);
3434      }
3435  
3436      //Hadoop-591
3437      public int getPassFactor(int passNo, int numSegments) {
3438        if (passNo > 1 || numSegments <= factor || factor == 1) 
3439          return factor;
3440        int mod = (numSegments - 1) % (factor - 1);
3441        if (mod == 0)
3442          return factor;
3443        return mod + 1;
3444      }
3445      
3446      /** Return (& remove) the requested number of segment descriptors from the
3447       * sorted map.
3448       */
3449      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3450        if (numDescriptors > sortedSegmentSizes.size())
3451          numDescriptors = sortedSegmentSizes.size();
3452        SegmentDescriptor[] SegmentDescriptors = 
3453          new SegmentDescriptor[numDescriptors];
3454        Iterator iter = sortedSegmentSizes.keySet().iterator();
3455        int i = 0;
3456        while (i < numDescriptors) {
3457          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3458          iter.remove();
3459        }
3460        return SegmentDescriptors;
3461      }
3462    } // SequenceFile.Sorter.MergeQueue
3463
3464    /** This class defines a merge segment. This class can be subclassed to 
3465     * provide a customized cleanup method implementation. In this 
3466     * implementation, cleanup closes the file handle and deletes the file 
3467     */
3468    public class SegmentDescriptor implements Comparable {
3469      
3470      long segmentOffset; //the start of the segment in the file
3471      long segmentLength; //the length of the segment
3472      Path segmentPathName; //the path name of the file containing the segment
3473      boolean ignoreSync = true; //set to true for temp files
3474      private Reader in = null; 
3475      private DataOutputBuffer rawKey = null; //this will hold the current key
3476      private boolean preserveInput = false; //delete input segment files?
3477      
3478      /** Constructs a segment
3479       * @param segmentOffset the offset of the segment in the file
3480       * @param segmentLength the length of the segment
3481       * @param segmentPathName the path name of the file containing the segment
3482       */
3483      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3484                                Path segmentPathName) {
3485        this.segmentOffset = segmentOffset;
3486        this.segmentLength = segmentLength;
3487        this.segmentPathName = segmentPathName;
3488      }
3489      
3490      /** Do the sync checks */
3491      public void doSync() {ignoreSync = false;}
3492      
3493      /** Whether to delete the files when no longer needed */
3494      public void preserveInput(boolean preserve) {
3495        preserveInput = preserve;
3496      }
3497
3498      public boolean shouldPreserveInput() {
3499        return preserveInput;
3500      }
3501      
3502      @Override
3503      public int compareTo(Object o) {
3504        SegmentDescriptor that = (SegmentDescriptor)o;
3505        if (this.segmentLength != that.segmentLength) {
3506          return (this.segmentLength < that.segmentLength ? -1 : 1);
3507        }
3508        if (this.segmentOffset != that.segmentOffset) {
3509          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3510        }
3511        return (this.segmentPathName.toString()).
3512          compareTo(that.segmentPathName.toString());
3513      }
3514
3515      @Override
3516      public boolean equals(Object o) {
3517        if (!(o instanceof SegmentDescriptor)) {
3518          return false;
3519        }
3520        SegmentDescriptor that = (SegmentDescriptor)o;
3521        if (this.segmentLength == that.segmentLength &&
3522            this.segmentOffset == that.segmentOffset &&
3523            this.segmentPathName.toString().equals(
3524              that.segmentPathName.toString())) {
3525          return true;
3526        }
3527        return false;
3528      }
3529
3530      @Override
3531      public int hashCode() {
3532        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3533      }
3534
3535      /** Fills up the rawKey object with the key returned by the Reader
3536       * @return true if there is a key returned; false, otherwise
3537       * @throws IOException
3538       */
3539      public boolean nextRawKey() throws IOException {
3540        if (in == null) {
3541          int bufferSize = getBufferSize(conf); 
3542          Reader reader = new Reader(conf,
3543                                     Reader.file(segmentPathName), 
3544                                     Reader.bufferSize(bufferSize),
3545                                     Reader.start(segmentOffset), 
3546                                     Reader.length(segmentLength));
3547        
3548          //sometimes we ignore syncs especially for temp merge files
3549          if (ignoreSync) reader.ignoreSync();
3550
3551          if (reader.getKeyClass() != keyClass)
3552            throw new IOException("wrong key class: " + reader.getKeyClass() +
3553                                  " is not " + keyClass);
3554          if (reader.getValueClass() != valClass)
3555            throw new IOException("wrong value class: "+reader.getValueClass()+
3556                                  " is not " + valClass);
3557          this.in = reader;
3558          rawKey = new DataOutputBuffer();
3559        }
3560        rawKey.reset();
3561        int keyLength = 
3562          in.nextRawKey(rawKey);
3563        return (keyLength >= 0);
3564      }
3565
3566      /** Fills up the passed rawValue with the value corresponding to the key
3567       * read earlier
3568       * @param rawValue
3569       * @return the length of the value
3570       * @throws IOException
3571       */
3572      public int nextRawValue(ValueBytes rawValue) throws IOException {
3573        int valLength = in.nextRawValue(rawValue);
3574        return valLength;
3575      }
3576      
3577      /** Returns the stored rawKey */
3578      public DataOutputBuffer getKey() {
3579        return rawKey;
3580      }
3581      
3582      /** closes the underlying reader */
3583      private void close() throws IOException {
3584        this.in.close();
3585        this.in = null;
3586      }
3587
3588      /** The default cleanup. Subclasses can override this with a custom 
3589       * cleanup 
3590       */
3591      public void cleanup() throws IOException {
3592        close();
3593        if (!preserveInput) {
3594          fs.delete(segmentPathName, true);
3595        }
3596      }
3597    } // SequenceFile.Sorter.SegmentDescriptor
3598    
3599    /** This class provisions multiple segments contained within a single
3600     *  file
3601     */
3602    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3603
3604      SegmentContainer parentContainer = null;
3605
3606      /** Constructs a segment
3607       * @param segmentOffset the offset of the segment in the file
3608       * @param segmentLength the length of the segment
3609       * @param segmentPathName the path name of the file containing the segment
3610       * @param parent the parent SegmentContainer that holds the segment
3611       */
3612      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3613                                       Path segmentPathName, SegmentContainer parent) {
3614        super(segmentOffset, segmentLength, segmentPathName);
3615        this.parentContainer = parent;
3616      }
3617      /** The default cleanup. Subclasses can override this with a custom 
3618       * cleanup 
3619       */
3620      @Override
3621      public void cleanup() throws IOException {
3622        super.close();
3623        if (super.shouldPreserveInput()) return;
3624        parentContainer.cleanup();
3625      }
3626      
3627      @Override
3628      public boolean equals(Object o) {
3629        if (!(o instanceof LinkedSegmentsDescriptor)) {
3630          return false;
3631        }
3632        return super.equals(o);
3633      }
3634    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3635
3636    /** The class that defines a container for segments to be merged. Primarily
3637     * required to delete temp files as soon as all the contained segments
3638     * have been looked at */
3639    private class SegmentContainer {
3640      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3641      private int numSegmentsContained; //# of segments contained
3642      private Path inName; //input file from where segments are created
3643      
3644      //the list of segments read from the file
3645      private ArrayList <SegmentDescriptor> segments = 
3646        new ArrayList <SegmentDescriptor>();
3647      /** This constructor is there primarily to serve the sort routine that 
3648       * generates a single output file with an associated index file */
3649      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3650        //get the segments from indexIn
3651        FSDataInputStream fsIndexIn = fs.open(indexIn);
3652        long end = fs.getFileStatus(indexIn).getLen();
3653        while (fsIndexIn.getPos() < end) {
3654          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3655          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3656          Path segmentName = inName;
3657          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3658                                                    segmentLength, segmentName, this));
3659        }
3660        fsIndexIn.close();
3661        fs.delete(indexIn, true);
3662        numSegmentsContained = segments.size();
3663        this.inName = inName;
3664      }
3665
3666      public List <SegmentDescriptor> getSegmentList() {
3667        return segments;
3668      }
3669      public void cleanup() throws IOException {
3670        numSegmentsCleanedUp++;
3671        if (numSegmentsCleanedUp == numSegmentsContained) {
3672          fs.delete(inName, true);
3673        }
3674      }
3675    } //SequenceFile.Sorter.SegmentContainer
3676
3677  } // SequenceFile.Sorter
3678
3679} // SequenceFile