001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025import org.apache.commons.logging.*;
026import org.apache.hadoop.util.Options;
027import org.apache.hadoop.fs.*;
028import org.apache.hadoop.fs.Options.CreateOpts;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DefaultCodec;
036import org.apache.hadoop.io.compress.GzipCodec;
037import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038import org.apache.hadoop.io.serializer.Deserializer;
039import org.apache.hadoop.io.serializer.Serializer;
040import org.apache.hadoop.io.serializer.SerializationFactory;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.*;
044import org.apache.hadoop.util.Progressable;
045import org.apache.hadoop.util.Progress;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.util.NativeCodeLoader;
048import org.apache.hadoop.util.MergeSort;
049import org.apache.hadoop.util.PriorityQueue;
050
051/** 
052 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
053 * pairs.
054 * 
055 * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
056 * {@link Sorter} classes for writing, reading and sorting respectively.</p>
057 * 
058 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
059 * {@link CompressionType} used to compress key/value pairs:
060 * <ol>
061 *   <li>
062 *   <code>Writer</code> : Uncompressed records.
063 *   </li>
064 *   <li>
065 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
066 *                                       values.
067 *   </li>
068 *   <li>
069 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
070 *                                      values are collected in 'blocks' 
071 *                                      separately and compressed. The size of 
072 *                                      the 'block' is configurable.
073 * </ol>
074 * 
075 * <p>The actual compression algorithm used to compress key and/or values can be
076 * specified by using the appropriate {@link CompressionCodec}.</p>
077 * 
078 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
079 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
080 *
081 * <p>The {@link Reader} acts as the bridge and can read any of the above 
082 * <code>SequenceFile</code> formats.</p>
083 *
084 * <h4 id="Formats">SequenceFile Formats</h4>
085 * 
086 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
087 * depending on the <code>CompressionType</code> specified. All of them share a
088 * <a href="#Header">common header</a> described below.
089 * 
090 * <h5 id="Header">SequenceFile Header</h5>
091 * <ul>
092 *   <li>
093 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
094 *             version number (e.g. SEQ4 or SEQ6)
095 *   </li>
096 *   <li>
097 *   keyClassName -key class
098 *   </li>
099 *   <li>
100 *   valueClassName - value class
101 *   </li>
102 *   <li>
103 *   compression - A boolean which specifies if compression is turned on for 
104 *                 keys/values in this file.
105 *   </li>
106 *   <li>
107 *   blockCompression - A boolean which specifies if block-compression is 
108 *                      turned on for keys/values in this file.
109 *   </li>
110 *   <li>
111 *   compression codec - <code>CompressionCodec</code> class which is used for  
112 *                       compression of keys and/or values (if compression is 
113 *                       enabled).
114 *   </li>
115 *   <li>
116 *   metadata - {@link Metadata} for this file.
117 *   </li>
118 *   <li>
119 *   sync - A sync marker to denote end of the header.
120 *   </li>
121 * </ul>
122 * 
123 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
124 * <ul>
125 * <li>
126 * <a href="#Header">Header</a>
127 * </li>
128 * <li>
129 * Record
130 *   <ul>
131 *     <li>Record length</li>
132 *     <li>Key length</li>
133 *     <li>Key</li>
134 *     <li>Value</li>
135 *   </ul>
136 * </li>
137 * <li>
138 * A sync-marker every few <code>100</code> bytes or so.
139 * </li>
140 * </ul>
141 *
142 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
143 * <ul>
144 * <li>
145 * <a href="#Header">Header</a>
146 * </li>
147 * <li>
148 * Record
149 *   <ul>
150 *     <li>Record length</li>
151 *     <li>Key length</li>
152 *     <li>Key</li>
153 *     <li><i>Compressed</i> Value</li>
154 *   </ul>
155 * </li>
156 * <li>
157 * A sync-marker every few <code>100</code> bytes or so.
158 * </li>
159 * </ul>
160 * 
161 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
162 * <ul>
163 * <li>
164 * <a href="#Header">Header</a>
165 * </li>
166 * <li>
167 * Record <i>Block</i>
168 *   <ul>
169 *     <li>Uncompressed number of records in the block</li>
170 *     <li>Compressed key-lengths block-size</li>
171 *     <li>Compressed key-lengths block</li>
172 *     <li>Compressed keys block-size</li>
173 *     <li>Compressed keys block</li>
174 *     <li>Compressed value-lengths block-size</li>
175 *     <li>Compressed value-lengths block</li>
176 *     <li>Compressed values block-size</li>
177 *     <li>Compressed values block</li>
178 *   </ul>
179 * </li>
180 * <li>
181 * A sync-marker every block.
182 * </li>
183 * </ul>
184 * 
185 * <p>The compressed blocks of key lengths and value lengths consist of the 
186 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
187 * format.</p>
188 * 
189 * @see CompressionCodec
190 */
191@InterfaceAudience.Public
192@InterfaceStability.Stable
193public class SequenceFile {
194  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
195
196  private SequenceFile() {}                         // no public ctor
197
198  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
199  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
200  private static final byte VERSION_WITH_METADATA = (byte)6;
201  private static byte[] VERSION = new byte[] {
202    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
203  };
204
205  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
206  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
207  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
208
209  /** The number of bytes between sync points.*/
210  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
211
212  /** 
213   * The compression type used to compress key/value pairs in the 
214   * {@link SequenceFile}.
215   * 
216   * @see SequenceFile.Writer
217   */
218  public static enum CompressionType {
219    /** Do not compress records. */
220    NONE, 
221    /** Compress values only, each separately. */
222    RECORD,
223    /** Compress sequences of records together in blocks. */
224    BLOCK
225  }
226
227  /**
228   * Get the compression type for the reduce outputs
229   * @param job the job config to look in
230   * @return the kind of compression to use
231   */
232  static public CompressionType getDefaultCompressionType(Configuration job) {
233    String name = job.get("io.seqfile.compression.type");
234    return name == null ? CompressionType.RECORD : 
235      CompressionType.valueOf(name);
236  }
237  
238  /**
239   * Set the default compression type for sequence files.
240   * @param job the configuration to modify
241   * @param val the new compression type (none, block, record)
242   */
243  static public void setDefaultCompressionType(Configuration job, 
244                                               CompressionType val) {
245    job.set("io.seqfile.compression.type", val.toString());
246  }
247
248  /**
249   * Create a new Writer with the given options.
250   * @param conf the configuration to use
251   * @param opts the options to create the file with
252   * @return a new Writer
253   * @throws IOException
254   */
255  public static Writer createWriter(Configuration conf, Writer.Option... opts
256                                    ) throws IOException {
257    Writer.CompressionOption compressionOption = 
258      Options.getOption(Writer.CompressionOption.class, opts);
259    CompressionType kind;
260    if (compressionOption != null) {
261      kind = compressionOption.getValue();
262    } else {
263      kind = getDefaultCompressionType(conf);
264      opts = Options.prependOptions(opts, Writer.compression(kind));
265    }
266    switch (kind) {
267      default:
268      case NONE:
269        return new Writer(conf, opts);
270      case RECORD:
271        return new RecordCompressWriter(conf, opts);
272      case BLOCK:
273        return new BlockCompressWriter(conf, opts);
274    }
275  }
276
277  /**
278   * Construct the preferred type of SequenceFile Writer.
279   * @param fs The configured filesystem. 
280   * @param conf The configuration.
281   * @param name The name of the file. 
282   * @param keyClass The 'key' type.
283   * @param valClass The 'value' type.
284   * @return Returns the handle to the constructed SequenceFile Writer.
285   * @throws IOException
286   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
287   *     instead.
288   */
289  @Deprecated
290  public static Writer 
291    createWriter(FileSystem fs, Configuration conf, Path name, 
292                 Class keyClass, Class valClass) throws IOException {
293    return createWriter(conf, Writer.filesystem(fs),
294                        Writer.file(name), Writer.keyClass(keyClass),
295                        Writer.valueClass(valClass));
296  }
297  
298  /**
299   * Construct the preferred type of SequenceFile Writer.
300   * @param fs The configured filesystem. 
301   * @param conf The configuration.
302   * @param name The name of the file. 
303   * @param keyClass The 'key' type.
304   * @param valClass The 'value' type.
305   * @param compressionType The compression type.
306   * @return Returns the handle to the constructed SequenceFile Writer.
307   * @throws IOException
308   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
309   *     instead.
310   */
311  @Deprecated
312  public static Writer 
313    createWriter(FileSystem fs, Configuration conf, Path name, 
314                 Class keyClass, Class valClass, 
315                 CompressionType compressionType) throws IOException {
316    return createWriter(conf, Writer.filesystem(fs),
317                        Writer.file(name), Writer.keyClass(keyClass),
318                        Writer.valueClass(valClass), 
319                        Writer.compression(compressionType));
320  }
321  
322  /**
323   * Construct the preferred type of SequenceFile Writer.
324   * @param fs The configured filesystem. 
325   * @param conf The configuration.
326   * @param name The name of the file. 
327   * @param keyClass The 'key' type.
328   * @param valClass The 'value' type.
329   * @param compressionType The compression type.
330   * @param progress The Progressable object to track progress.
331   * @return Returns the handle to the constructed SequenceFile Writer.
332   * @throws IOException
333   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
334   *     instead.
335   */
336  @Deprecated
337  public static Writer
338    createWriter(FileSystem fs, Configuration conf, Path name, 
339                 Class keyClass, Class valClass, CompressionType compressionType,
340                 Progressable progress) throws IOException {
341    return createWriter(conf, Writer.file(name),
342                        Writer.filesystem(fs),
343                        Writer.keyClass(keyClass),
344                        Writer.valueClass(valClass), 
345                        Writer.compression(compressionType),
346                        Writer.progressable(progress));
347  }
348
349  /**
350   * Construct the preferred type of SequenceFile Writer.
351   * @param fs The configured filesystem. 
352   * @param conf The configuration.
353   * @param name The name of the file. 
354   * @param keyClass The 'key' type.
355   * @param valClass The 'value' type.
356   * @param compressionType The compression type.
357   * @param codec The compression codec.
358   * @return Returns the handle to the constructed SequenceFile Writer.
359   * @throws IOException
360   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
361   *     instead.
362   */
363  @Deprecated
364  public static Writer 
365    createWriter(FileSystem fs, Configuration conf, Path name, 
366                 Class keyClass, Class valClass, CompressionType compressionType, 
367                 CompressionCodec codec) throws IOException {
368    return createWriter(conf, Writer.file(name),
369                        Writer.filesystem(fs),
370                        Writer.keyClass(keyClass),
371                        Writer.valueClass(valClass), 
372                        Writer.compression(compressionType, codec));
373  }
374  
375  /**
376   * Construct the preferred type of SequenceFile Writer.
377   * @param fs The configured filesystem. 
378   * @param conf The configuration.
379   * @param name The name of the file. 
380   * @param keyClass The 'key' type.
381   * @param valClass The 'value' type.
382   * @param compressionType The compression type.
383   * @param codec The compression codec.
384   * @param progress The Progressable object to track progress.
385   * @param metadata The metadata of the file.
386   * @return Returns the handle to the constructed SequenceFile Writer.
387   * @throws IOException
388   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
389   *     instead.
390   */
391  @Deprecated
392  public static Writer
393    createWriter(FileSystem fs, Configuration conf, Path name, 
394                 Class keyClass, Class valClass, 
395                 CompressionType compressionType, CompressionCodec codec,
396                 Progressable progress, Metadata metadata) throws IOException {
397    return createWriter(conf, Writer.file(name),
398                        Writer.filesystem(fs),
399                        Writer.keyClass(keyClass),
400                        Writer.valueClass(valClass),
401                        Writer.compression(compressionType, codec),
402                        Writer.progressable(progress),
403                        Writer.metadata(metadata));
404  }
405
406  /**
407   * Construct the preferred type of SequenceFile Writer.
408   * @param fs The configured filesystem.
409   * @param conf The configuration.
410   * @param name The name of the file.
411   * @param keyClass The 'key' type.
412   * @param valClass The 'value' type.
413   * @param bufferSize buffer size for the underlaying outputstream.
414   * @param replication replication factor for the file.
415   * @param blockSize block size for the file.
416   * @param compressionType The compression type.
417   * @param codec The compression codec.
418   * @param progress The Progressable object to track progress.
419   * @param metadata The metadata of the file.
420   * @return Returns the handle to the constructed SequenceFile Writer.
421   * @throws IOException
422   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
423   *     instead.
424   */
425  @Deprecated
426  public static Writer
427    createWriter(FileSystem fs, Configuration conf, Path name,
428                 Class keyClass, Class valClass, int bufferSize,
429                 short replication, long blockSize,
430                 CompressionType compressionType, CompressionCodec codec,
431                 Progressable progress, Metadata metadata) throws IOException {
432    return createWriter(conf, Writer.file(name),
433                        Writer.filesystem(fs),
434                        Writer.keyClass(keyClass),
435                        Writer.valueClass(valClass), 
436                        Writer.bufferSize(bufferSize), 
437                        Writer.replication(replication),
438                        Writer.blockSize(blockSize),
439                        Writer.compression(compressionType, codec),
440                        Writer.progressable(progress),
441                        Writer.metadata(metadata));
442  }
443
444  /**
445   * Construct the preferred type of SequenceFile Writer.
446   * @param fs The configured filesystem.
447   * @param conf The configuration.
448   * @param name The name of the file.
449   * @param keyClass The 'key' type.
450   * @param valClass The 'value' type.
451   * @param bufferSize buffer size for the underlaying outputstream.
452   * @param replication replication factor for the file.
453   * @param blockSize block size for the file.
454   * @param createParent create parent directory if non-existent
455   * @param compressionType The compression type.
456   * @param codec The compression codec.
457   * @param metadata The metadata of the file.
458   * @return Returns the handle to the constructed SequenceFile Writer.
459   * @throws IOException
460   */
461  @Deprecated
462  public static Writer
463  createWriter(FileSystem fs, Configuration conf, Path name,
464               Class keyClass, Class valClass, int bufferSize,
465               short replication, long blockSize, boolean createParent,
466               CompressionType compressionType, CompressionCodec codec,
467               Metadata metadata) throws IOException {
468    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
469        conf, name, keyClass, valClass, compressionType, codec,
470        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
471        CreateOpts.bufferSize(bufferSize),
472        createParent ? CreateOpts.createParent()
473                     : CreateOpts.donotCreateParent(),
474        CreateOpts.repFac(replication),
475        CreateOpts.blockSize(blockSize)
476      );
477  }
478
479  /**
480   * Construct the preferred type of SequenceFile Writer.
481   * @param fc The context for the specified file.
482   * @param conf The configuration.
483   * @param name The name of the file.
484   * @param keyClass The 'key' type.
485   * @param valClass The 'value' type.
486   * @param compressionType The compression type.
487   * @param codec The compression codec.
488   * @param metadata The metadata of the file.
489   * @param createFlag gives the semantics of create: overwrite, append etc.
490   * @param opts file creation options; see {@link CreateOpts}.
491   * @return Returns the handle to the constructed SequenceFile Writer.
492   * @throws IOException
493   */
494  public static Writer
495  createWriter(FileContext fc, Configuration conf, Path name,
496               Class keyClass, Class valClass,
497               CompressionType compressionType, CompressionCodec codec,
498               Metadata metadata,
499               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
500               throws IOException {
501    return createWriter(conf, fc.create(name, createFlag, opts),
502          keyClass, valClass, compressionType, codec, metadata).ownStream();
503  }
504
505  /**
506   * Construct the preferred type of SequenceFile Writer.
507   * @param fs The configured filesystem. 
508   * @param conf The configuration.
509   * @param name The name of the file. 
510   * @param keyClass The 'key' type.
511   * @param valClass The 'value' type.
512   * @param compressionType The compression type.
513   * @param codec The compression codec.
514   * @param progress The Progressable object to track progress.
515   * @return Returns the handle to the constructed SequenceFile Writer.
516   * @throws IOException
517   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
518   *     instead.
519   */
520  @Deprecated
521  public static Writer
522    createWriter(FileSystem fs, Configuration conf, Path name, 
523                 Class keyClass, Class valClass, 
524                 CompressionType compressionType, CompressionCodec codec,
525                 Progressable progress) throws IOException {
526    return createWriter(conf, Writer.file(name),
527                        Writer.filesystem(fs),
528                        Writer.keyClass(keyClass),
529                        Writer.valueClass(valClass),
530                        Writer.compression(compressionType, codec),
531                        Writer.progressable(progress));
532  }
533
534  /**
535   * Construct the preferred type of 'raw' SequenceFile Writer.
536   * @param conf The configuration.
537   * @param out The stream on top which the writer is to be constructed.
538   * @param keyClass The 'key' type.
539   * @param valClass The 'value' type.
540   * @param compressionType The compression type.
541   * @param codec The compression codec.
542   * @param metadata The metadata of the file.
543   * @return Returns the handle to the constructed SequenceFile Writer.
544   * @throws IOException
545   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
546   *     instead.
547   */
548  @Deprecated
549  public static Writer
550    createWriter(Configuration conf, FSDataOutputStream out, 
551                 Class keyClass, Class valClass,
552                 CompressionType compressionType,
553                 CompressionCodec codec, Metadata metadata) throws IOException {
554    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
555                        Writer.valueClass(valClass), 
556                        Writer.compression(compressionType, codec),
557                        Writer.metadata(metadata));
558  }
559  
560  /**
561   * Construct the preferred type of 'raw' SequenceFile Writer.
562   * @param conf The configuration.
563   * @param out The stream on top which the writer is to be constructed.
564   * @param keyClass The 'key' type.
565   * @param valClass The 'value' type.
566   * @param compressionType The compression type.
567   * @param codec The compression codec.
568   * @return Returns the handle to the constructed SequenceFile Writer.
569   * @throws IOException
570   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
571   *     instead.
572   */
573  @Deprecated
574  public static Writer
575    createWriter(Configuration conf, FSDataOutputStream out, 
576                 Class keyClass, Class valClass, CompressionType compressionType,
577                 CompressionCodec codec) throws IOException {
578    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
579                        Writer.valueClass(valClass),
580                        Writer.compression(compressionType, codec));
581  }
582  
583
584  /** The interface to 'raw' values of SequenceFiles. */
585  public static interface ValueBytes {
586
587    /** Writes the uncompressed bytes to the outStream.
588     * @param outStream : Stream to write uncompressed bytes into.
589     * @throws IOException
590     */
591    public void writeUncompressedBytes(DataOutputStream outStream)
592      throws IOException;
593
594    /** Write compressed bytes to outStream. 
595     * Note: that it will NOT compress the bytes if they are not compressed.
596     * @param outStream : Stream to write compressed bytes into.
597     */
598    public void writeCompressedBytes(DataOutputStream outStream) 
599      throws IllegalArgumentException, IOException;
600
601    /**
602     * Size of stored data.
603     */
604    public int getSize();
605  }
606  
607  private static class UncompressedBytes implements ValueBytes {
608    private int dataSize;
609    private byte[] data;
610    
611    private UncompressedBytes() {
612      data = null;
613      dataSize = 0;
614    }
615    
616    private void reset(DataInputStream in, int length) throws IOException {
617      if (data == null) {
618        data = new byte[length];
619      } else if (length > data.length) {
620        data = new byte[Math.max(length, data.length * 2)];
621      }
622      dataSize = -1;
623      in.readFully(data, 0, length);
624      dataSize = length;
625    }
626    
627    public int getSize() {
628      return dataSize;
629    }
630    
631    public void writeUncompressedBytes(DataOutputStream outStream)
632      throws IOException {
633      outStream.write(data, 0, dataSize);
634    }
635
636    public void writeCompressedBytes(DataOutputStream outStream) 
637      throws IllegalArgumentException, IOException {
638      throw 
639        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
640    }
641
642  } // UncompressedBytes
643  
644  private static class CompressedBytes implements ValueBytes {
645    private int dataSize;
646    private byte[] data;
647    DataInputBuffer rawData = null;
648    CompressionCodec codec = null;
649    CompressionInputStream decompressedStream = null;
650
651    private CompressedBytes(CompressionCodec codec) {
652      data = null;
653      dataSize = 0;
654      this.codec = codec;
655    }
656
657    private void reset(DataInputStream in, int length) throws IOException {
658      if (data == null) {
659        data = new byte[length];
660      } else if (length > data.length) {
661        data = new byte[Math.max(length, data.length * 2)];
662      } 
663      dataSize = -1;
664      in.readFully(data, 0, length);
665      dataSize = length;
666    }
667    
668    public int getSize() {
669      return dataSize;
670    }
671    
672    public void writeUncompressedBytes(DataOutputStream outStream)
673      throws IOException {
674      if (decompressedStream == null) {
675        rawData = new DataInputBuffer();
676        decompressedStream = codec.createInputStream(rawData);
677      } else {
678        decompressedStream.resetState();
679      }
680      rawData.reset(data, 0, dataSize);
681
682      byte[] buffer = new byte[8192];
683      int bytesRead = 0;
684      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
685        outStream.write(buffer, 0, bytesRead);
686      }
687    }
688
689    public void writeCompressedBytes(DataOutputStream outStream) 
690      throws IllegalArgumentException, IOException {
691      outStream.write(data, 0, dataSize);
692    }
693
694  } // CompressedBytes
695  
696  /**
697   * The class encapsulating with the metadata of a file.
698   * The metadata of a file is a list of attribute name/value
699   * pairs of Text type.
700   *
701   */
702  public static class Metadata implements Writable {
703
704    private TreeMap<Text, Text> theMetadata;
705    
706    public Metadata() {
707      this(new TreeMap<Text, Text>());
708    }
709    
710    public Metadata(TreeMap<Text, Text> arg) {
711      if (arg == null) {
712        this.theMetadata = new TreeMap<Text, Text>();
713      } else {
714        this.theMetadata = arg;
715      }
716    }
717    
718    public Text get(Text name) {
719      return this.theMetadata.get(name);
720    }
721    
722    public void set(Text name, Text value) {
723      this.theMetadata.put(name, value);
724    }
725    
726    public TreeMap<Text, Text> getMetadata() {
727      return new TreeMap<Text, Text>(this.theMetadata);
728    }
729    
730    public void write(DataOutput out) throws IOException {
731      out.writeInt(this.theMetadata.size());
732      Iterator<Map.Entry<Text, Text>> iter =
733        this.theMetadata.entrySet().iterator();
734      while (iter.hasNext()) {
735        Map.Entry<Text, Text> en = iter.next();
736        en.getKey().write(out);
737        en.getValue().write(out);
738      }
739    }
740
741    public void readFields(DataInput in) throws IOException {
742      int sz = in.readInt();
743      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
744      this.theMetadata = new TreeMap<Text, Text>();
745      for (int i = 0; i < sz; i++) {
746        Text key = new Text();
747        Text val = new Text();
748        key.readFields(in);
749        val.readFields(in);
750        this.theMetadata.put(key, val);
751      }    
752    }
753
754    public boolean equals(Object other) {
755      if (other == null) {
756        return false;
757      }
758      if (other.getClass() != this.getClass()) {
759        return false;
760      } else {
761        return equals((Metadata)other);
762      }
763    }
764    
765    public boolean equals(Metadata other) {
766      if (other == null) return false;
767      if (this.theMetadata.size() != other.theMetadata.size()) {
768        return false;
769      }
770      Iterator<Map.Entry<Text, Text>> iter1 =
771        this.theMetadata.entrySet().iterator();
772      Iterator<Map.Entry<Text, Text>> iter2 =
773        other.theMetadata.entrySet().iterator();
774      while (iter1.hasNext() && iter2.hasNext()) {
775        Map.Entry<Text, Text> en1 = iter1.next();
776        Map.Entry<Text, Text> en2 = iter2.next();
777        if (!en1.getKey().equals(en2.getKey())) {
778          return false;
779        }
780        if (!en1.getValue().equals(en2.getValue())) {
781          return false;
782        }
783      }
784      if (iter1.hasNext() || iter2.hasNext()) {
785        return false;
786      }
787      return true;
788    }
789
790    public int hashCode() {
791      assert false : "hashCode not designed";
792      return 42; // any arbitrary constant will do 
793    }
794    
795    public String toString() {
796      StringBuilder sb = new StringBuilder();
797      sb.append("size: ").append(this.theMetadata.size()).append("\n");
798      Iterator<Map.Entry<Text, Text>> iter =
799        this.theMetadata.entrySet().iterator();
800      while (iter.hasNext()) {
801        Map.Entry<Text, Text> en = iter.next();
802        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
803        sb.append("\n");
804      }
805      return sb.toString();
806    }
807  }
808  
809  /** Write key/value pairs to a sequence-format file. */
810  public static class Writer implements java.io.Closeable {
811    private Configuration conf;
812    FSDataOutputStream out;
813    boolean ownOutputStream = true;
814    DataOutputBuffer buffer = new DataOutputBuffer();
815
816    Class keyClass;
817    Class valClass;
818
819    private final CompressionType compress;
820    CompressionCodec codec = null;
821    CompressionOutputStream deflateFilter = null;
822    DataOutputStream deflateOut = null;
823    Metadata metadata = null;
824    Compressor compressor = null;
825    
826    protected Serializer keySerializer;
827    protected Serializer uncompressedValSerializer;
828    protected Serializer compressedValSerializer;
829    
830    // Insert a globally unique 16-byte value every few entries, so that one
831    // can seek into the middle of a file and then synchronize with record
832    // starts and ends by scanning for this value.
833    long lastSyncPos;                     // position of last sync
834    byte[] sync;                          // 16 random bytes
835    {
836      try {                                       
837        MessageDigest digester = MessageDigest.getInstance("MD5");
838        long time = System.currentTimeMillis();
839        digester.update((new UID()+"@"+time).getBytes());
840        sync = digester.digest();
841      } catch (Exception e) {
842        throw new RuntimeException(e);
843      }
844    }
845
846    public static interface Option {}
847    
848    static class FileOption extends Options.PathOption 
849                                    implements Option {
850      FileOption(Path path) {
851        super(path);
852      }
853    }
854
855    /**
856     * @deprecated only used for backwards-compatibility in the createWriter methods
857     * that take FileSystem.
858     */
859    @Deprecated
860    private static class FileSystemOption implements Option {
861      private final FileSystem value;
862      protected FileSystemOption(FileSystem value) {
863        this.value = value;
864      }
865      public FileSystem getValue() {
866        return value;
867      }
868    }
869
870    static class StreamOption extends Options.FSDataOutputStreamOption 
871                              implements Option {
872      StreamOption(FSDataOutputStream stream) {
873        super(stream);
874      }
875    }
876
877    static class BufferSizeOption extends Options.IntegerOption
878                                  implements Option {
879      BufferSizeOption(int value) {
880        super(value);
881      }
882    }
883    
884    static class BlockSizeOption extends Options.LongOption implements Option {
885      BlockSizeOption(long value) {
886        super(value);
887      }
888    }
889
890    static class ReplicationOption extends Options.IntegerOption
891                                   implements Option {
892      ReplicationOption(int value) {
893        super(value);
894      }
895    }
896
897    static class KeyClassOption extends Options.ClassOption implements Option {
898      KeyClassOption(Class<?> value) {
899        super(value);
900      }
901    }
902
903    static class ValueClassOption extends Options.ClassOption
904                                          implements Option {
905      ValueClassOption(Class<?> value) {
906        super(value);
907      }
908    }
909
910    static class MetadataOption implements Option {
911      private final Metadata value;
912      MetadataOption(Metadata value) {
913        this.value = value;
914      }
915      Metadata getValue() {
916        return value;
917      }
918    }
919
920    static class ProgressableOption extends Options.ProgressableOption
921                                    implements Option {
922      ProgressableOption(Progressable value) {
923        super(value);
924      }
925    }
926
927    private static class CompressionOption implements Option {
928      private final CompressionType value;
929      private final CompressionCodec codec;
930      CompressionOption(CompressionType value) {
931        this(value, null);
932      }
933      CompressionOption(CompressionType value, CompressionCodec codec) {
934        this.value = value;
935        this.codec = (CompressionType.NONE != value && null == codec)
936          ? new DefaultCodec()
937          : codec;
938      }
939      CompressionType getValue() {
940        return value;
941      }
942      CompressionCodec getCodec() {
943        return codec;
944      }
945    }
946    
947    public static Option file(Path value) {
948      return new FileOption(value);
949    }
950
951    /**
952     * @deprecated only used for backwards-compatibility in the createWriter methods
953     * that take FileSystem.
954     */
955    @Deprecated
956    private static Option filesystem(FileSystem fs) {
957      return new SequenceFile.Writer.FileSystemOption(fs);
958    }
959    
960    public static Option bufferSize(int value) {
961      return new BufferSizeOption(value);
962    }
963    
964    public static Option stream(FSDataOutputStream value) {
965      return new StreamOption(value);
966    }
967    
968    public static Option replication(short value) {
969      return new ReplicationOption(value);
970    }
971    
972    public static Option blockSize(long value) {
973      return new BlockSizeOption(value);
974    }
975    
976    public static Option progressable(Progressable value) {
977      return new ProgressableOption(value);
978    }
979
980    public static Option keyClass(Class<?> value) {
981      return new KeyClassOption(value);
982    }
983    
984    public static Option valueClass(Class<?> value) {
985      return new ValueClassOption(value);
986    }
987    
988    public static Option metadata(Metadata value) {
989      return new MetadataOption(value);
990    }
991
992    public static Option compression(CompressionType value) {
993      return new CompressionOption(value);
994    }
995
996    public static Option compression(CompressionType value,
997        CompressionCodec codec) {
998      return new CompressionOption(value, codec);
999    }
1000    
1001    /**
1002     * Construct a uncompressed writer from a set of options.
1003     * @param conf the configuration to use
1004     * @param options the options used when creating the writer
1005     * @throws IOException if it fails
1006     */
1007    Writer(Configuration conf, 
1008           Option... opts) throws IOException {
1009      BlockSizeOption blockSizeOption = 
1010        Options.getOption(BlockSizeOption.class, opts);
1011      BufferSizeOption bufferSizeOption = 
1012        Options.getOption(BufferSizeOption.class, opts);
1013      ReplicationOption replicationOption = 
1014        Options.getOption(ReplicationOption.class, opts);
1015      ProgressableOption progressOption = 
1016        Options.getOption(ProgressableOption.class, opts);
1017      FileOption fileOption = Options.getOption(FileOption.class, opts);
1018      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1019      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1020      KeyClassOption keyClassOption = 
1021        Options.getOption(KeyClassOption.class, opts);
1022      ValueClassOption valueClassOption = 
1023        Options.getOption(ValueClassOption.class, opts);
1024      MetadataOption metadataOption = 
1025        Options.getOption(MetadataOption.class, opts);
1026      CompressionOption compressionTypeOption =
1027        Options.getOption(CompressionOption.class, opts);
1028      // check consistency of options
1029      if ((fileOption == null) == (streamOption == null)) {
1030        throw new IllegalArgumentException("file or stream must be specified");
1031      }
1032      if (fileOption == null && (blockSizeOption != null ||
1033                                 bufferSizeOption != null ||
1034                                 replicationOption != null ||
1035                                 progressOption != null)) {
1036        throw new IllegalArgumentException("file modifier options not " +
1037                                           "compatible with stream");
1038      }
1039
1040      FSDataOutputStream out;
1041      boolean ownStream = fileOption != null;
1042      if (ownStream) {
1043        Path p = fileOption.getValue();
1044        FileSystem fs;
1045        if (fsOption != null) {
1046          fs = fsOption.getValue();
1047        } else {
1048          fs = p.getFileSystem(conf);
1049        }
1050        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1051          bufferSizeOption.getValue();
1052        short replication = replicationOption == null ? 
1053          fs.getDefaultReplication(p) :
1054          (short) replicationOption.getValue();
1055        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1056          blockSizeOption.getValue();
1057        Progressable progress = progressOption == null ? null :
1058          progressOption.getValue();
1059        out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1060      } else {
1061        out = streamOption.getValue();
1062      }
1063      Class<?> keyClass = keyClassOption == null ?
1064          Object.class : keyClassOption.getValue();
1065      Class<?> valueClass = valueClassOption == null ?
1066          Object.class : valueClassOption.getValue();
1067      Metadata metadata = metadataOption == null ?
1068          new Metadata() : metadataOption.getValue();
1069      this.compress = compressionTypeOption.getValue();
1070      final CompressionCodec codec = compressionTypeOption.getCodec();
1071      if (codec != null &&
1072          (codec instanceof GzipCodec) &&
1073          !NativeCodeLoader.isNativeCodeLoaded() &&
1074          !ZlibFactory.isNativeZlibLoaded(conf)) {
1075        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1076                                           "GzipCodec without native-hadoop " +
1077                                           "code!");
1078      }
1079      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1080    }
1081
1082    /** Create the named file.
1083     * @deprecated Use 
1084     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1085     *   instead.
1086     */
1087    @Deprecated
1088    public Writer(FileSystem fs, Configuration conf, Path name, 
1089                  Class keyClass, Class valClass) throws IOException {
1090      this.compress = CompressionType.NONE;
1091      init(conf, fs.create(name), true, keyClass, valClass, null, 
1092           new Metadata());
1093    }
1094    
1095    /** Create the named file with write-progress reporter.
1096     * @deprecated Use 
1097     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1098     *   instead.
1099     */
1100    @Deprecated
1101    public Writer(FileSystem fs, Configuration conf, Path name, 
1102                  Class keyClass, Class valClass,
1103                  Progressable progress, Metadata metadata) throws IOException {
1104      this.compress = CompressionType.NONE;
1105      init(conf, fs.create(name, progress), true, keyClass, valClass,
1106           null, metadata);
1107    }
1108    
1109    /** Create the named file with write-progress reporter. 
1110     * @deprecated Use 
1111     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1112     *   instead.
1113     */
1114    @Deprecated
1115    public Writer(FileSystem fs, Configuration conf, Path name,
1116                  Class keyClass, Class valClass,
1117                  int bufferSize, short replication, long blockSize,
1118                  Progressable progress, Metadata metadata) throws IOException {
1119      this.compress = CompressionType.NONE;
1120      init(conf,
1121           fs.create(name, true, bufferSize, replication, blockSize, progress),
1122           true, keyClass, valClass, null, metadata);
1123    }
1124
1125    boolean isCompressed() { return compress != CompressionType.NONE; }
1126    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1127    
1128    Writer ownStream() { this.ownOutputStream = true; return this;  }
1129
1130    /** Write and flush the file header. */
1131    private void writeFileHeader() 
1132      throws IOException {
1133      out.write(VERSION);
1134      Text.writeString(out, keyClass.getName());
1135      Text.writeString(out, valClass.getName());
1136      
1137      out.writeBoolean(this.isCompressed());
1138      out.writeBoolean(this.isBlockCompressed());
1139      
1140      if (this.isCompressed()) {
1141        Text.writeString(out, (codec.getClass()).getName());
1142      }
1143      this.metadata.write(out);
1144      out.write(sync);                       // write the sync bytes
1145      out.flush();                           // flush header
1146    }
1147    
1148    /** Initialize. */
1149    @SuppressWarnings("unchecked")
1150    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1151              Class keyClass, Class valClass,
1152              CompressionCodec codec, Metadata metadata) 
1153      throws IOException {
1154      this.conf = conf;
1155      this.out = out;
1156      this.ownOutputStream = ownStream;
1157      this.keyClass = keyClass;
1158      this.valClass = valClass;
1159      this.codec = codec;
1160      this.metadata = metadata;
1161      SerializationFactory serializationFactory = new SerializationFactory(conf);
1162      this.keySerializer = serializationFactory.getSerializer(keyClass);
1163      this.keySerializer.open(buffer);
1164      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1165      this.uncompressedValSerializer.open(buffer);
1166      if (this.codec != null) {
1167        ReflectionUtils.setConf(this.codec, this.conf);
1168        this.compressor = CodecPool.getCompressor(this.codec);
1169        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1170        this.deflateOut = 
1171          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1172        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1173        this.compressedValSerializer.open(deflateOut);
1174      }
1175      writeFileHeader();
1176    }
1177    
1178    /** Returns the class of keys in this file. */
1179    public Class getKeyClass() { return keyClass; }
1180
1181    /** Returns the class of values in this file. */
1182    public Class getValueClass() { return valClass; }
1183
1184    /** Returns the compression codec of data in this file. */
1185    public CompressionCodec getCompressionCodec() { return codec; }
1186    
1187    /** create a sync point */
1188    public void sync() throws IOException {
1189      if (sync != null && lastSyncPos != out.getPos()) {
1190        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1191        out.write(sync);                          // write sync
1192        lastSyncPos = out.getPos();               // update lastSyncPos
1193      }
1194    }
1195
1196    /** flush all currently written data to the file system */
1197    public void syncFs() throws IOException {
1198      if (out != null) {
1199        out.sync();                               // flush contents to file system
1200      }
1201    }
1202
1203    /** Returns the configuration of this file. */
1204    Configuration getConf() { return conf; }
1205    
1206    /** Close the file. */
1207    public synchronized void close() throws IOException {
1208      keySerializer.close();
1209      uncompressedValSerializer.close();
1210      if (compressedValSerializer != null) {
1211        compressedValSerializer.close();
1212      }
1213
1214      CodecPool.returnCompressor(compressor);
1215      compressor = null;
1216      
1217      if (out != null) {
1218        
1219        // Close the underlying stream iff we own it...
1220        if (ownOutputStream) {
1221          out.close();
1222        } else {
1223          out.flush();
1224        }
1225        out = null;
1226      }
1227    }
1228
1229    synchronized void checkAndWriteSync() throws IOException {
1230      if (sync != null &&
1231          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1232        sync();
1233      }
1234    }
1235
1236    /** Append a key/value pair. */
1237    public void append(Writable key, Writable val)
1238      throws IOException {
1239      append((Object) key, (Object) val);
1240    }
1241
1242    /** Append a key/value pair. */
1243    @SuppressWarnings("unchecked")
1244    public synchronized void append(Object key, Object val)
1245      throws IOException {
1246      if (key.getClass() != keyClass)
1247        throw new IOException("wrong key class: "+key.getClass().getName()
1248                              +" is not "+keyClass);
1249      if (val.getClass() != valClass)
1250        throw new IOException("wrong value class: "+val.getClass().getName()
1251                              +" is not "+valClass);
1252
1253      buffer.reset();
1254
1255      // Append the 'key'
1256      keySerializer.serialize(key);
1257      int keyLength = buffer.getLength();
1258      if (keyLength < 0)
1259        throw new IOException("negative length keys not allowed: " + key);
1260
1261      // Append the 'value'
1262      if (compress == CompressionType.RECORD) {
1263        deflateFilter.resetState();
1264        compressedValSerializer.serialize(val);
1265        deflateOut.flush();
1266        deflateFilter.finish();
1267      } else {
1268        uncompressedValSerializer.serialize(val);
1269      }
1270
1271      // Write the record out
1272      checkAndWriteSync();                                // sync
1273      out.writeInt(buffer.getLength());                   // total record length
1274      out.writeInt(keyLength);                            // key portion length
1275      out.write(buffer.getData(), 0, buffer.getLength()); // data
1276    }
1277
1278    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1279        int keyLength, ValueBytes val) throws IOException {
1280      if (keyLength < 0)
1281        throw new IOException("negative length keys not allowed: " + keyLength);
1282
1283      int valLength = val.getSize();
1284
1285      checkAndWriteSync();
1286      
1287      out.writeInt(keyLength+valLength);          // total record length
1288      out.writeInt(keyLength);                    // key portion length
1289      out.write(keyData, keyOffset, keyLength);   // key
1290      val.writeUncompressedBytes(out);            // value
1291    }
1292
1293    /** Returns the current length of the output file.
1294     *
1295     * <p>This always returns a synchronized position.  In other words,
1296     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1297     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1298     * the key may be earlier in the file than key last written when this
1299     * method was called (e.g., with block-compression, it may be the first key
1300     * in the block that was being written when this method was called).
1301     */
1302    public synchronized long getLength() throws IOException {
1303      return out.getPos();
1304    }
1305
1306  } // class Writer
1307
1308  /** Write key/compressed-value pairs to a sequence-format file. */
1309  static class RecordCompressWriter extends Writer {
1310    
1311    RecordCompressWriter(Configuration conf, 
1312                         Option... options) throws IOException {
1313      super(conf, options);
1314    }
1315
1316    /** Append a key/value pair. */
1317    @SuppressWarnings("unchecked")
1318    public synchronized void append(Object key, Object val)
1319      throws IOException {
1320      if (key.getClass() != keyClass)
1321        throw new IOException("wrong key class: "+key.getClass().getName()
1322                              +" is not "+keyClass);
1323      if (val.getClass() != valClass)
1324        throw new IOException("wrong value class: "+val.getClass().getName()
1325                              +" is not "+valClass);
1326
1327      buffer.reset();
1328
1329      // Append the 'key'
1330      keySerializer.serialize(key);
1331      int keyLength = buffer.getLength();
1332      if (keyLength < 0)
1333        throw new IOException("negative length keys not allowed: " + key);
1334
1335      // Compress 'value' and append it
1336      deflateFilter.resetState();
1337      compressedValSerializer.serialize(val);
1338      deflateOut.flush();
1339      deflateFilter.finish();
1340
1341      // Write the record out
1342      checkAndWriteSync();                                // sync
1343      out.writeInt(buffer.getLength());                   // total record length
1344      out.writeInt(keyLength);                            // key portion length
1345      out.write(buffer.getData(), 0, buffer.getLength()); // data
1346    }
1347
1348    /** Append a key/value pair. */
1349    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1350        int keyLength, ValueBytes val) throws IOException {
1351
1352      if (keyLength < 0)
1353        throw new IOException("negative length keys not allowed: " + keyLength);
1354
1355      int valLength = val.getSize();
1356      
1357      checkAndWriteSync();                        // sync
1358      out.writeInt(keyLength+valLength);          // total record length
1359      out.writeInt(keyLength);                    // key portion length
1360      out.write(keyData, keyOffset, keyLength);   // 'key' data
1361      val.writeCompressedBytes(out);              // 'value' data
1362    }
1363    
1364  } // RecordCompressionWriter
1365
1366  /** Write compressed key/value blocks to a sequence-format file. */
1367  static class BlockCompressWriter extends Writer {
1368    
1369    private int noBufferedRecords = 0;
1370    
1371    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1372    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1373
1374    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1375    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1376
1377    private final int compressionBlockSize;
1378    
1379    BlockCompressWriter(Configuration conf,
1380                        Option... options) throws IOException {
1381      super(conf, options);
1382      compressionBlockSize = 
1383        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1384      keySerializer.close();
1385      keySerializer.open(keyBuffer);
1386      uncompressedValSerializer.close();
1387      uncompressedValSerializer.open(valBuffer);
1388    }
1389
1390    /** Workhorse to check and write out compressed data/lengths */
1391    private synchronized 
1392      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1393      throws IOException {
1394      deflateFilter.resetState();
1395      buffer.reset();
1396      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1397                       uncompressedDataBuffer.getLength());
1398      deflateOut.flush();
1399      deflateFilter.finish();
1400      
1401      WritableUtils.writeVInt(out, buffer.getLength());
1402      out.write(buffer.getData(), 0, buffer.getLength());
1403    }
1404    
1405    /** Compress and flush contents to dfs */
1406    public synchronized void sync() throws IOException {
1407      if (noBufferedRecords > 0) {
1408        super.sync();
1409        
1410        // No. of records
1411        WritableUtils.writeVInt(out, noBufferedRecords);
1412        
1413        // Write 'keys' and lengths
1414        writeBuffer(keyLenBuffer);
1415        writeBuffer(keyBuffer);
1416        
1417        // Write 'values' and lengths
1418        writeBuffer(valLenBuffer);
1419        writeBuffer(valBuffer);
1420        
1421        // Flush the file-stream
1422        out.flush();
1423        
1424        // Reset internal states
1425        keyLenBuffer.reset();
1426        keyBuffer.reset();
1427        valLenBuffer.reset();
1428        valBuffer.reset();
1429        noBufferedRecords = 0;
1430      }
1431      
1432    }
1433    
1434    /** Close the file. */
1435    public synchronized void close() throws IOException {
1436      if (out != null) {
1437        sync();
1438      }
1439      super.close();
1440    }
1441
1442    /** Append a key/value pair. */
1443    @SuppressWarnings("unchecked")
1444    public synchronized void append(Object key, Object val)
1445      throws IOException {
1446      if (key.getClass() != keyClass)
1447        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1448      if (val.getClass() != valClass)
1449        throw new IOException("wrong value class: "+val+" is not "+valClass);
1450
1451      // Save key/value into respective buffers 
1452      int oldKeyLength = keyBuffer.getLength();
1453      keySerializer.serialize(key);
1454      int keyLength = keyBuffer.getLength() - oldKeyLength;
1455      if (keyLength < 0)
1456        throw new IOException("negative length keys not allowed: " + key);
1457      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1458
1459      int oldValLength = valBuffer.getLength();
1460      uncompressedValSerializer.serialize(val);
1461      int valLength = valBuffer.getLength() - oldValLength;
1462      WritableUtils.writeVInt(valLenBuffer, valLength);
1463      
1464      // Added another key/value pair
1465      ++noBufferedRecords;
1466      
1467      // Compress and flush?
1468      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1469      if (currentBlockSize >= compressionBlockSize) {
1470        sync();
1471      }
1472    }
1473    
1474    /** Append a key/value pair. */
1475    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1476        int keyLength, ValueBytes val) throws IOException {
1477      
1478      if (keyLength < 0)
1479        throw new IOException("negative length keys not allowed");
1480
1481      int valLength = val.getSize();
1482      
1483      // Save key/value data in relevant buffers
1484      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1485      keyBuffer.write(keyData, keyOffset, keyLength);
1486      WritableUtils.writeVInt(valLenBuffer, valLength);
1487      val.writeUncompressedBytes(valBuffer);
1488
1489      // Added another key/value pair
1490      ++noBufferedRecords;
1491
1492      // Compress and flush?
1493      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1494      if (currentBlockSize >= compressionBlockSize) {
1495        sync();
1496      }
1497    }
1498  
1499  } // BlockCompressionWriter
1500
1501  /** Get the configured buffer size */
1502  private static int getBufferSize(Configuration conf) {
1503    return conf.getInt("io.file.buffer.size", 4096);
1504  }
1505
1506  /** Reads key/value pairs from a sequence-format file. */
1507  public static class Reader implements java.io.Closeable {
1508    private String filename;
1509    private FSDataInputStream in;
1510    private DataOutputBuffer outBuf = new DataOutputBuffer();
1511
1512    private byte version;
1513
1514    private String keyClassName;
1515    private String valClassName;
1516    private Class keyClass;
1517    private Class valClass;
1518
1519    private CompressionCodec codec = null;
1520    private Metadata metadata = null;
1521    
1522    private byte[] sync = new byte[SYNC_HASH_SIZE];
1523    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1524    private boolean syncSeen;
1525
1526    private long headerEnd;
1527    private long end;
1528    private int keyLength;
1529    private int recordLength;
1530
1531    private boolean decompress;
1532    private boolean blockCompressed;
1533    
1534    private Configuration conf;
1535
1536    private int noBufferedRecords = 0;
1537    private boolean lazyDecompress = true;
1538    private boolean valuesDecompressed = true;
1539    
1540    private int noBufferedKeys = 0;
1541    private int noBufferedValues = 0;
1542    
1543    private DataInputBuffer keyLenBuffer = null;
1544    private CompressionInputStream keyLenInFilter = null;
1545    private DataInputStream keyLenIn = null;
1546    private Decompressor keyLenDecompressor = null;
1547    private DataInputBuffer keyBuffer = null;
1548    private CompressionInputStream keyInFilter = null;
1549    private DataInputStream keyIn = null;
1550    private Decompressor keyDecompressor = null;
1551
1552    private DataInputBuffer valLenBuffer = null;
1553    private CompressionInputStream valLenInFilter = null;
1554    private DataInputStream valLenIn = null;
1555    private Decompressor valLenDecompressor = null;
1556    private DataInputBuffer valBuffer = null;
1557    private CompressionInputStream valInFilter = null;
1558    private DataInputStream valIn = null;
1559    private Decompressor valDecompressor = null;
1560    
1561    private Deserializer keyDeserializer;
1562    private Deserializer valDeserializer;
1563
1564    /**
1565     * A tag interface for all of the Reader options
1566     */
1567    public static interface Option {}
1568    
1569    /**
1570     * Create an option to specify the path name of the sequence file.
1571     * @param value the path to read
1572     * @return a new option
1573     */
1574    public static Option file(Path value) {
1575      return new FileOption(value);
1576    }
1577    
1578    /**
1579     * Create an option to specify the stream with the sequence file.
1580     * @param value the stream to read.
1581     * @return a new option
1582     */
1583    public static Option stream(FSDataInputStream value) {
1584      return new InputStreamOption(value);
1585    }
1586    
1587    /**
1588     * Create an option to specify the starting byte to read.
1589     * @param value the number of bytes to skip over
1590     * @return a new option
1591     */
1592    public static Option start(long value) {
1593      return new StartOption(value);
1594    }
1595    
1596    /**
1597     * Create an option to specify the number of bytes to read.
1598     * @param value the number of bytes to read
1599     * @return a new option
1600     */
1601    public static Option length(long value) {
1602      return new LengthOption(value);
1603    }
1604    
1605    /**
1606     * Create an option with the buffer size for reading the given pathname.
1607     * @param value the number of bytes to buffer
1608     * @return a new option
1609     */
1610    public static Option bufferSize(int value) {
1611      return new BufferSizeOption(value);
1612    }
1613
1614    private static class FileOption extends Options.PathOption 
1615                                    implements Option {
1616      private FileOption(Path value) {
1617        super(value);
1618      }
1619    }
1620    
1621    private static class InputStreamOption
1622        extends Options.FSDataInputStreamOption 
1623        implements Option {
1624      private InputStreamOption(FSDataInputStream value) {
1625        super(value);
1626      }
1627    }
1628
1629    private static class StartOption extends Options.LongOption
1630                                     implements Option {
1631      private StartOption(long value) {
1632        super(value);
1633      }
1634    }
1635
1636    private static class LengthOption extends Options.LongOption
1637                                      implements Option {
1638      private LengthOption(long value) {
1639        super(value);
1640      }
1641    }
1642
1643    private static class BufferSizeOption extends Options.IntegerOption
1644                                      implements Option {
1645      private BufferSizeOption(int value) {
1646        super(value);
1647      }
1648    }
1649
1650    // only used directly
1651    private static class OnlyHeaderOption extends Options.BooleanOption 
1652                                          implements Option {
1653      private OnlyHeaderOption() {
1654        super(true);
1655      }
1656    }
1657
1658    public Reader(Configuration conf, Option... opts) throws IOException {
1659      // Look up the options, these are null if not set
1660      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1661      InputStreamOption streamOpt = 
1662        Options.getOption(InputStreamOption.class, opts);
1663      StartOption startOpt = Options.getOption(StartOption.class, opts);
1664      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1665      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1666      OnlyHeaderOption headerOnly = 
1667        Options.getOption(OnlyHeaderOption.class, opts);
1668      // check for consistency
1669      if ((fileOpt == null) == (streamOpt == null)) {
1670        throw new 
1671          IllegalArgumentException("File or stream option must be specified");
1672      }
1673      if (fileOpt == null && bufOpt != null) {
1674        throw new IllegalArgumentException("buffer size can only be set when" +
1675                                           " a file is specified.");
1676      }
1677      // figure out the real values
1678      Path filename = null;
1679      FSDataInputStream file;
1680      final long len;
1681      if (fileOpt != null) {
1682        filename = fileOpt.getValue();
1683        FileSystem fs = filename.getFileSystem(conf);
1684        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1685        len = null == lenOpt
1686          ? fs.getFileStatus(filename).getLen()
1687          : lenOpt.getValue();
1688        file = openFile(fs, filename, bufSize, len);
1689      } else {
1690        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1691        file = streamOpt.getValue();
1692      }
1693      long start = startOpt == null ? 0 : startOpt.getValue();
1694      // really set up
1695      initialize(filename, file, start, len, conf, headerOnly != null);
1696    }
1697
1698    /**
1699     * Construct a reader by opening a file from the given file system.
1700     * @param fs The file system used to open the file.
1701     * @param file The file being read.
1702     * @param conf Configuration
1703     * @throws IOException
1704     * @deprecated Use Reader(Configuration, Option...) instead.
1705     */
1706    @Deprecated
1707    public Reader(FileSystem fs, Path file, 
1708                  Configuration conf) throws IOException {
1709      this(conf, file(file.makeQualified(fs)));
1710    }
1711
1712    /**
1713     * Construct a reader by the given input stream.
1714     * @param in An input stream.
1715     * @param buffersize unused
1716     * @param start The starting position.
1717     * @param length The length being read.
1718     * @param conf Configuration
1719     * @throws IOException
1720     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1721     */
1722    @Deprecated
1723    public Reader(FSDataInputStream in, int buffersize,
1724        long start, long length, Configuration conf) throws IOException {
1725      this(conf, stream(in), start(start), length(length));
1726    }
1727
1728    /** Common work of the constructors. */
1729    private void initialize(Path filename, FSDataInputStream in,
1730                            long start, long length, Configuration conf,
1731                            boolean tempReader) throws IOException {
1732      if (in == null) {
1733        throw new IllegalArgumentException("in == null");
1734      }
1735      this.filename = filename == null ? "<unknown>" : filename.toString();
1736      this.in = in;
1737      this.conf = conf;
1738      boolean succeeded = false;
1739      try {
1740        seek(start);
1741        this.end = this.in.getPos() + length;
1742        // if it wrapped around, use the max
1743        if (end < length) {
1744          end = Long.MAX_VALUE;
1745        }
1746        init(tempReader);
1747        succeeded = true;
1748      } finally {
1749        if (!succeeded) {
1750          IOUtils.cleanup(LOG, this.in);
1751        }
1752      }
1753    }
1754
1755    /**
1756     * Override this method to specialize the type of
1757     * {@link FSDataInputStream} returned.
1758     * @param fs The file system used to open the file.
1759     * @param file The file being read.
1760     * @param bufferSize The buffer size used to read the file.
1761     * @param length The length being read if it is >= 0.  Otherwise,
1762     *               the length is not available.
1763     * @return The opened stream.
1764     * @throws IOException
1765     */
1766    protected FSDataInputStream openFile(FileSystem fs, Path file,
1767        int bufferSize, long length) throws IOException {
1768      return fs.open(file, bufferSize);
1769    }
1770    
1771    /**
1772     * Initialize the {@link Reader}
1773     * @param tmpReader <code>true</code> if we are constructing a temporary
1774     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1775     *                  and hence do not initialize every component; 
1776     *                  <code>false</code> otherwise.
1777     * @throws IOException
1778     */
1779    private void init(boolean tempReader) throws IOException {
1780      byte[] versionBlock = new byte[VERSION.length];
1781      in.readFully(versionBlock);
1782
1783      if ((versionBlock[0] != VERSION[0]) ||
1784          (versionBlock[1] != VERSION[1]) ||
1785          (versionBlock[2] != VERSION[2]))
1786        throw new IOException(this + " not a SequenceFile");
1787
1788      // Set 'version'
1789      version = versionBlock[3];
1790      if (version > VERSION[3])
1791        throw new VersionMismatchException(VERSION[3], version);
1792
1793      if (version < BLOCK_COMPRESS_VERSION) {
1794        UTF8 className = new UTF8();
1795
1796        className.readFields(in);
1797        keyClassName = className.toString(); // key class name
1798
1799        className.readFields(in);
1800        valClassName = className.toString(); // val class name
1801      } else {
1802        keyClassName = Text.readString(in);
1803        valClassName = Text.readString(in);
1804      }
1805
1806      if (version > 2) {                          // if version > 2
1807        this.decompress = in.readBoolean();       // is compressed?
1808      } else {
1809        decompress = false;
1810      }
1811
1812      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1813        this.blockCompressed = in.readBoolean();  // is block-compressed?
1814      } else {
1815        blockCompressed = false;
1816      }
1817      
1818      // if version >= 5
1819      // setup the compression codec
1820      if (decompress) {
1821        if (version >= CUSTOM_COMPRESS_VERSION) {
1822          String codecClassname = Text.readString(in);
1823          try {
1824            Class<? extends CompressionCodec> codecClass
1825              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1826            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1827          } catch (ClassNotFoundException cnfe) {
1828            throw new IllegalArgumentException("Unknown codec: " + 
1829                                               codecClassname, cnfe);
1830          }
1831        } else {
1832          codec = new DefaultCodec();
1833          ((Configurable)codec).setConf(conf);
1834        }
1835      }
1836      
1837      this.metadata = new Metadata();
1838      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1839        this.metadata.readFields(in);
1840      }
1841      
1842      if (version > 1) {                          // if version > 1
1843        in.readFully(sync);                       // read sync bytes
1844        headerEnd = in.getPos();                  // record end of header
1845      }
1846      
1847      // Initialize... *not* if this we are constructing a temporary Reader
1848      if (!tempReader) {
1849        valBuffer = new DataInputBuffer();
1850        if (decompress) {
1851          valDecompressor = CodecPool.getDecompressor(codec);
1852          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1853          valIn = new DataInputStream(valInFilter);
1854        } else {
1855          valIn = valBuffer;
1856        }
1857
1858        if (blockCompressed) {
1859          keyLenBuffer = new DataInputBuffer();
1860          keyBuffer = new DataInputBuffer();
1861          valLenBuffer = new DataInputBuffer();
1862
1863          keyLenDecompressor = CodecPool.getDecompressor(codec);
1864          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1865                                                   keyLenDecompressor);
1866          keyLenIn = new DataInputStream(keyLenInFilter);
1867
1868          keyDecompressor = CodecPool.getDecompressor(codec);
1869          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1870          keyIn = new DataInputStream(keyInFilter);
1871
1872          valLenDecompressor = CodecPool.getDecompressor(codec);
1873          valLenInFilter = codec.createInputStream(valLenBuffer, 
1874                                                   valLenDecompressor);
1875          valLenIn = new DataInputStream(valLenInFilter);
1876        }
1877        
1878        SerializationFactory serializationFactory =
1879          new SerializationFactory(conf);
1880        this.keyDeserializer =
1881          getDeserializer(serializationFactory, getKeyClass());
1882        if (!blockCompressed) {
1883          this.keyDeserializer.open(valBuffer);
1884        } else {
1885          this.keyDeserializer.open(keyIn);
1886        }
1887        this.valDeserializer =
1888          getDeserializer(serializationFactory, getValueClass());
1889        this.valDeserializer.open(valIn);
1890      }
1891    }
1892    
1893    @SuppressWarnings("unchecked")
1894    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1895      return sf.getDeserializer(c);
1896    }
1897    
1898    /** Close the file. */
1899    public synchronized void close() throws IOException {
1900      // Return the decompressors to the pool
1901      CodecPool.returnDecompressor(keyLenDecompressor);
1902      CodecPool.returnDecompressor(keyDecompressor);
1903      CodecPool.returnDecompressor(valLenDecompressor);
1904      CodecPool.returnDecompressor(valDecompressor);
1905      keyLenDecompressor = keyDecompressor = null;
1906      valLenDecompressor = valDecompressor = null;
1907      
1908      if (keyDeserializer != null) {
1909        keyDeserializer.close();
1910      }
1911      if (valDeserializer != null) {
1912        valDeserializer.close();
1913      }
1914      
1915      // Close the input-stream
1916      in.close();
1917    }
1918
1919    /** Returns the name of the key class. */
1920    public String getKeyClassName() {
1921      return keyClassName;
1922    }
1923
1924    /** Returns the class of keys in this file. */
1925    public synchronized Class<?> getKeyClass() {
1926      if (null == keyClass) {
1927        try {
1928          keyClass = WritableName.getClass(getKeyClassName(), conf);
1929        } catch (IOException e) {
1930          throw new RuntimeException(e);
1931        }
1932      }
1933      return keyClass;
1934    }
1935
1936    /** Returns the name of the value class. */
1937    public String getValueClassName() {
1938      return valClassName;
1939    }
1940
1941    /** Returns the class of values in this file. */
1942    public synchronized Class<?> getValueClass() {
1943      if (null == valClass) {
1944        try {
1945          valClass = WritableName.getClass(getValueClassName(), conf);
1946        } catch (IOException e) {
1947          throw new RuntimeException(e);
1948        }
1949      }
1950      return valClass;
1951    }
1952
1953    /** Returns true if values are compressed. */
1954    public boolean isCompressed() { return decompress; }
1955    
1956    /** Returns true if records are block-compressed. */
1957    public boolean isBlockCompressed() { return blockCompressed; }
1958    
1959    /** Returns the compression codec of data in this file. */
1960    public CompressionCodec getCompressionCodec() { return codec; }
1961    
1962    /**
1963     * Get the compression type for this file.
1964     * @return the compression type
1965     */
1966    public CompressionType getCompressionType() {
1967      if (decompress) {
1968        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
1969      } else {
1970        return CompressionType.NONE;
1971      }
1972    }
1973
1974    /** Returns the metadata object of the file */
1975    public Metadata getMetadata() {
1976      return this.metadata;
1977    }
1978    
1979    /** Returns the configuration used for this file. */
1980    Configuration getConf() { return conf; }
1981    
1982    /** Read a compressed buffer */
1983    private synchronized void readBuffer(DataInputBuffer buffer, 
1984                                         CompressionInputStream filter) throws IOException {
1985      // Read data into a temporary buffer
1986      DataOutputBuffer dataBuffer = new DataOutputBuffer();
1987
1988      try {
1989        int dataBufferLength = WritableUtils.readVInt(in);
1990        dataBuffer.write(in, dataBufferLength);
1991      
1992        // Set up 'buffer' connected to the input-stream
1993        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
1994      } finally {
1995        dataBuffer.close();
1996      }
1997
1998      // Reset the codec
1999      filter.resetState();
2000    }
2001    
2002    /** Read the next 'compressed' block */
2003    private synchronized void readBlock() throws IOException {
2004      // Check if we need to throw away a whole block of 
2005      // 'values' due to 'lazy decompression' 
2006      if (lazyDecompress && !valuesDecompressed) {
2007        in.seek(WritableUtils.readVInt(in)+in.getPos());
2008        in.seek(WritableUtils.readVInt(in)+in.getPos());
2009      }
2010      
2011      // Reset internal states
2012      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2013      valuesDecompressed = false;
2014
2015      //Process sync
2016      if (sync != null) {
2017        in.readInt();
2018        in.readFully(syncCheck);                // read syncCheck
2019        if (!Arrays.equals(sync, syncCheck))    // check it
2020          throw new IOException("File is corrupt!");
2021      }
2022      syncSeen = true;
2023
2024      // Read number of records in this block
2025      noBufferedRecords = WritableUtils.readVInt(in);
2026      
2027      // Read key lengths and keys
2028      readBuffer(keyLenBuffer, keyLenInFilter);
2029      readBuffer(keyBuffer, keyInFilter);
2030      noBufferedKeys = noBufferedRecords;
2031      
2032      // Read value lengths and values
2033      if (!lazyDecompress) {
2034        readBuffer(valLenBuffer, valLenInFilter);
2035        readBuffer(valBuffer, valInFilter);
2036        noBufferedValues = noBufferedRecords;
2037        valuesDecompressed = true;
2038      }
2039    }
2040
2041    /** 
2042     * Position valLenIn/valIn to the 'value' 
2043     * corresponding to the 'current' key 
2044     */
2045    private synchronized void seekToCurrentValue() throws IOException {
2046      if (!blockCompressed) {
2047        if (decompress) {
2048          valInFilter.resetState();
2049        }
2050        valBuffer.reset();
2051      } else {
2052        // Check if this is the first value in the 'block' to be read
2053        if (lazyDecompress && !valuesDecompressed) {
2054          // Read the value lengths and values
2055          readBuffer(valLenBuffer, valLenInFilter);
2056          readBuffer(valBuffer, valInFilter);
2057          noBufferedValues = noBufferedRecords;
2058          valuesDecompressed = true;
2059        }
2060        
2061        // Calculate the no. of bytes to skip
2062        // Note: 'current' key has already been read!
2063        int skipValBytes = 0;
2064        int currentKey = noBufferedKeys + 1;          
2065        for (int i=noBufferedValues; i > currentKey; --i) {
2066          skipValBytes += WritableUtils.readVInt(valLenIn);
2067          --noBufferedValues;
2068        }
2069        
2070        // Skip to the 'val' corresponding to 'current' key
2071        if (skipValBytes > 0) {
2072          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2073            throw new IOException("Failed to seek to " + currentKey + 
2074                                  "(th) value!");
2075          }
2076        }
2077      }
2078    }
2079
2080    /**
2081     * Get the 'value' corresponding to the last read 'key'.
2082     * @param val : The 'value' to be read.
2083     * @throws IOException
2084     */
2085    public synchronized void getCurrentValue(Writable val) 
2086      throws IOException {
2087      if (val instanceof Configurable) {
2088        ((Configurable) val).setConf(this.conf);
2089      }
2090
2091      // Position stream to 'current' value
2092      seekToCurrentValue();
2093
2094      if (!blockCompressed) {
2095        val.readFields(valIn);
2096        
2097        if (valIn.read() > 0) {
2098          LOG.info("available bytes: " + valIn.available());
2099          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2100                                + " bytes, should read " +
2101                                (valBuffer.getLength()-keyLength));
2102        }
2103      } else {
2104        // Get the value
2105        int valLength = WritableUtils.readVInt(valLenIn);
2106        val.readFields(valIn);
2107        
2108        // Read another compressed 'value'
2109        --noBufferedValues;
2110        
2111        // Sanity check
2112        if ((valLength < 0) && LOG.isDebugEnabled()) {
2113          LOG.debug(val + " is a zero-length value");
2114        }
2115      }
2116
2117    }
2118    
2119    /**
2120     * Get the 'value' corresponding to the last read 'key'.
2121     * @param val : The 'value' to be read.
2122     * @throws IOException
2123     */
2124    public synchronized Object getCurrentValue(Object val) 
2125      throws IOException {
2126      if (val instanceof Configurable) {
2127        ((Configurable) val).setConf(this.conf);
2128      }
2129
2130      // Position stream to 'current' value
2131      seekToCurrentValue();
2132
2133      if (!blockCompressed) {
2134        val = deserializeValue(val);
2135        
2136        if (valIn.read() > 0) {
2137          LOG.info("available bytes: " + valIn.available());
2138          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2139                                + " bytes, should read " +
2140                                (valBuffer.getLength()-keyLength));
2141        }
2142      } else {
2143        // Get the value
2144        int valLength = WritableUtils.readVInt(valLenIn);
2145        val = deserializeValue(val);
2146        
2147        // Read another compressed 'value'
2148        --noBufferedValues;
2149        
2150        // Sanity check
2151        if ((valLength < 0) && LOG.isDebugEnabled()) {
2152          LOG.debug(val + " is a zero-length value");
2153        }
2154      }
2155      return val;
2156
2157    }
2158
2159    @SuppressWarnings("unchecked")
2160    private Object deserializeValue(Object val) throws IOException {
2161      return valDeserializer.deserialize(val);
2162    }
2163    
2164    /** Read the next key in the file into <code>key</code>, skipping its
2165     * value.  True if another entry exists, and false at end of file. */
2166    public synchronized boolean next(Writable key) throws IOException {
2167      if (key.getClass() != getKeyClass())
2168        throw new IOException("wrong key class: "+key.getClass().getName()
2169                              +" is not "+keyClass);
2170
2171      if (!blockCompressed) {
2172        outBuf.reset();
2173        
2174        keyLength = next(outBuf);
2175        if (keyLength < 0)
2176          return false;
2177        
2178        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2179        
2180        key.readFields(valBuffer);
2181        valBuffer.mark(0);
2182        if (valBuffer.getPosition() != keyLength)
2183          throw new IOException(key + " read " + valBuffer.getPosition()
2184                                + " bytes, should read " + keyLength);
2185      } else {
2186        //Reset syncSeen
2187        syncSeen = false;
2188        
2189        if (noBufferedKeys == 0) {
2190          try {
2191            readBlock();
2192          } catch (EOFException eof) {
2193            return false;
2194          }
2195        }
2196        
2197        int keyLength = WritableUtils.readVInt(keyLenIn);
2198        
2199        // Sanity check
2200        if (keyLength < 0) {
2201          return false;
2202        }
2203        
2204        //Read another compressed 'key'
2205        key.readFields(keyIn);
2206        --noBufferedKeys;
2207      }
2208
2209      return true;
2210    }
2211
2212    /** Read the next key/value pair in the file into <code>key</code> and
2213     * <code>val</code>.  Returns true if such a pair exists and false when at
2214     * end of file */
2215    public synchronized boolean next(Writable key, Writable val)
2216      throws IOException {
2217      if (val.getClass() != getValueClass())
2218        throw new IOException("wrong value class: "+val+" is not "+valClass);
2219
2220      boolean more = next(key);
2221      
2222      if (more) {
2223        getCurrentValue(val);
2224      }
2225
2226      return more;
2227    }
2228    
2229    /**
2230     * Read and return the next record length, potentially skipping over 
2231     * a sync block.
2232     * @return the length of the next record or -1 if there is no next record
2233     * @throws IOException
2234     */
2235    private synchronized int readRecordLength() throws IOException {
2236      if (in.getPos() >= end) {
2237        return -1;
2238      }      
2239      int length = in.readInt();
2240      if (version > 1 && sync != null &&
2241          length == SYNC_ESCAPE) {              // process a sync entry
2242        in.readFully(syncCheck);                // read syncCheck
2243        if (!Arrays.equals(sync, syncCheck))    // check it
2244          throw new IOException("File is corrupt!");
2245        syncSeen = true;
2246        if (in.getPos() >= end) {
2247          return -1;
2248        }
2249        length = in.readInt();                  // re-read length
2250      } else {
2251        syncSeen = false;
2252      }
2253      
2254      return length;
2255    }
2256    
2257    /** Read the next key/value pair in the file into <code>buffer</code>.
2258     * Returns the length of the key read, or -1 if at end of file.  The length
2259     * of the value may be computed by calling buffer.getLength() before and
2260     * after calls to this method. */
2261    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2262    @Deprecated
2263    synchronized int next(DataOutputBuffer buffer) throws IOException {
2264      // Unsupported for block-compressed sequence files
2265      if (blockCompressed) {
2266        throw new IOException("Unsupported call for block-compressed" +
2267                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2268      }
2269      try {
2270        int length = readRecordLength();
2271        if (length == -1) {
2272          return -1;
2273        }
2274        int keyLength = in.readInt();
2275        buffer.write(in, length);
2276        return keyLength;
2277      } catch (ChecksumException e) {             // checksum failure
2278        handleChecksumException(e);
2279        return next(buffer);
2280      }
2281    }
2282
2283    public ValueBytes createValueBytes() {
2284      ValueBytes val = null;
2285      if (!decompress || blockCompressed) {
2286        val = new UncompressedBytes();
2287      } else {
2288        val = new CompressedBytes(codec);
2289      }
2290      return val;
2291    }
2292
2293    /**
2294     * Read 'raw' records.
2295     * @param key - The buffer into which the key is read
2296     * @param val - The 'raw' value
2297     * @return Returns the total record length or -1 for end of file
2298     * @throws IOException
2299     */
2300    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2301      throws IOException {
2302      if (!blockCompressed) {
2303        int length = readRecordLength();
2304        if (length == -1) {
2305          return -1;
2306        }
2307        int keyLength = in.readInt();
2308        int valLength = length - keyLength;
2309        key.write(in, keyLength);
2310        if (decompress) {
2311          CompressedBytes value = (CompressedBytes)val;
2312          value.reset(in, valLength);
2313        } else {
2314          UncompressedBytes value = (UncompressedBytes)val;
2315          value.reset(in, valLength);
2316        }
2317        
2318        return length;
2319      } else {
2320        //Reset syncSeen
2321        syncSeen = false;
2322        
2323        // Read 'key'
2324        if (noBufferedKeys == 0) {
2325          if (in.getPos() >= end) 
2326            return -1;
2327
2328          try { 
2329            readBlock();
2330          } catch (EOFException eof) {
2331            return -1;
2332          }
2333        }
2334        int keyLength = WritableUtils.readVInt(keyLenIn);
2335        if (keyLength < 0) {
2336          throw new IOException("zero length key found!");
2337        }
2338        key.write(keyIn, keyLength);
2339        --noBufferedKeys;
2340        
2341        // Read raw 'value'
2342        seekToCurrentValue();
2343        int valLength = WritableUtils.readVInt(valLenIn);
2344        UncompressedBytes rawValue = (UncompressedBytes)val;
2345        rawValue.reset(valIn, valLength);
2346        --noBufferedValues;
2347        
2348        return (keyLength+valLength);
2349      }
2350      
2351    }
2352
2353    /**
2354     * Read 'raw' keys.
2355     * @param key - The buffer into which the key is read
2356     * @return Returns the key length or -1 for end of file
2357     * @throws IOException
2358     */
2359    public synchronized int nextRawKey(DataOutputBuffer key) 
2360      throws IOException {
2361      if (!blockCompressed) {
2362        recordLength = readRecordLength();
2363        if (recordLength == -1) {
2364          return -1;
2365        }
2366        keyLength = in.readInt();
2367        key.write(in, keyLength);
2368        return keyLength;
2369      } else {
2370        //Reset syncSeen
2371        syncSeen = false;
2372        
2373        // Read 'key'
2374        if (noBufferedKeys == 0) {
2375          if (in.getPos() >= end) 
2376            return -1;
2377
2378          try { 
2379            readBlock();
2380          } catch (EOFException eof) {
2381            return -1;
2382          }
2383        }
2384        int keyLength = WritableUtils.readVInt(keyLenIn);
2385        if (keyLength < 0) {
2386          throw new IOException("zero length key found!");
2387        }
2388        key.write(keyIn, keyLength);
2389        --noBufferedKeys;
2390        
2391        return keyLength;
2392      }
2393      
2394    }
2395
2396    /** Read the next key in the file, skipping its
2397     * value.  Return null at end of file. */
2398    public synchronized Object next(Object key) throws IOException {
2399      if (key != null && key.getClass() != getKeyClass()) {
2400        throw new IOException("wrong key class: "+key.getClass().getName()
2401                              +" is not "+keyClass);
2402      }
2403
2404      if (!blockCompressed) {
2405        outBuf.reset();
2406        
2407        keyLength = next(outBuf);
2408        if (keyLength < 0)
2409          return null;
2410        
2411        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2412        
2413        key = deserializeKey(key);
2414        valBuffer.mark(0);
2415        if (valBuffer.getPosition() != keyLength)
2416          throw new IOException(key + " read " + valBuffer.getPosition()
2417                                + " bytes, should read " + keyLength);
2418      } else {
2419        //Reset syncSeen
2420        syncSeen = false;
2421        
2422        if (noBufferedKeys == 0) {
2423          try {
2424            readBlock();
2425          } catch (EOFException eof) {
2426            return null;
2427          }
2428        }
2429        
2430        int keyLength = WritableUtils.readVInt(keyLenIn);
2431        
2432        // Sanity check
2433        if (keyLength < 0) {
2434          return null;
2435        }
2436        
2437        //Read another compressed 'key'
2438        key = deserializeKey(key);
2439        --noBufferedKeys;
2440      }
2441
2442      return key;
2443    }
2444
2445    @SuppressWarnings("unchecked")
2446    private Object deserializeKey(Object key) throws IOException {
2447      return keyDeserializer.deserialize(key);
2448    }
2449
2450    /**
2451     * Read 'raw' values.
2452     * @param val - The 'raw' value
2453     * @return Returns the value length
2454     * @throws IOException
2455     */
2456    public synchronized int nextRawValue(ValueBytes val) 
2457      throws IOException {
2458      
2459      // Position stream to current value
2460      seekToCurrentValue();
2461 
2462      if (!blockCompressed) {
2463        int valLength = recordLength - keyLength;
2464        if (decompress) {
2465          CompressedBytes value = (CompressedBytes)val;
2466          value.reset(in, valLength);
2467        } else {
2468          UncompressedBytes value = (UncompressedBytes)val;
2469          value.reset(in, valLength);
2470        }
2471         
2472        return valLength;
2473      } else {
2474        int valLength = WritableUtils.readVInt(valLenIn);
2475        UncompressedBytes rawValue = (UncompressedBytes)val;
2476        rawValue.reset(valIn, valLength);
2477        --noBufferedValues;
2478        return valLength;
2479      }
2480      
2481    }
2482
2483    private void handleChecksumException(ChecksumException e)
2484      throws IOException {
2485      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2486        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2487        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2488      } else {
2489        throw e;
2490      }
2491    }
2492
2493    /** disables sync. often invoked for tmp files */
2494    synchronized void ignoreSync() {
2495      sync = null;
2496    }
2497    
2498    /** Set the current byte position in the input file.
2499     *
2500     * <p>The position passed must be a position returned by {@link
2501     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2502     * position, use {@link SequenceFile.Reader#sync(long)}.
2503     */
2504    public synchronized void seek(long position) throws IOException {
2505      in.seek(position);
2506      if (blockCompressed) {                      // trigger block read
2507        noBufferedKeys = 0;
2508        valuesDecompressed = true;
2509      }
2510    }
2511
2512    /** Seek to the next sync mark past a given position.*/
2513    public synchronized void sync(long position) throws IOException {
2514      if (position+SYNC_SIZE >= end) {
2515        seek(end);
2516        return;
2517      }
2518
2519      if (position < headerEnd) {
2520        // seek directly to first record
2521        in.seek(headerEnd);
2522        // note the sync marker "seen" in the header
2523        syncSeen = true;
2524        return;
2525      }
2526
2527      try {
2528        seek(position+4);                         // skip escape
2529        in.readFully(syncCheck);
2530        int syncLen = sync.length;
2531        for (int i = 0; in.getPos() < end; i++) {
2532          int j = 0;
2533          for (; j < syncLen; j++) {
2534            if (sync[j] != syncCheck[(i+j)%syncLen])
2535              break;
2536          }
2537          if (j == syncLen) {
2538            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2539            return;
2540          }
2541          syncCheck[i%syncLen] = in.readByte();
2542        }
2543      } catch (ChecksumException e) {             // checksum failure
2544        handleChecksumException(e);
2545      }
2546    }
2547
2548    /** Returns true iff the previous call to next passed a sync mark.*/
2549    public synchronized boolean syncSeen() { return syncSeen; }
2550
2551    /** Return the current byte position in the input file. */
2552    public synchronized long getPosition() throws IOException {
2553      return in.getPos();
2554    }
2555
2556    /** Returns the name of the file. */
2557    public String toString() {
2558      return filename;
2559    }
2560
2561  }
2562
2563  /** Sorts key/value pairs in a sequence-format file.
2564   *
2565   * <p>For best performance, applications should make sure that the {@link
2566   * Writable#readFields(DataInput)} implementation of their keys is
2567   * very efficient.  In particular, it should avoid allocating memory.
2568   */
2569  public static class Sorter {
2570
2571    private RawComparator comparator;
2572
2573    private MergeSort mergeSort; //the implementation of merge sort
2574    
2575    private Path[] inFiles;                     // when merging or sorting
2576
2577    private Path outFile;
2578
2579    private int memory; // bytes
2580    private int factor; // merged per pass
2581
2582    private FileSystem fs = null;
2583
2584    private Class keyClass;
2585    private Class valClass;
2586
2587    private Configuration conf;
2588    private Metadata metadata;
2589    
2590    private Progressable progressable = null;
2591
2592    /** Sort and merge files containing the named classes. */
2593    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2594                  Class valClass, Configuration conf)  {
2595      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
2596    }
2597
2598    /** Sort and merge using an arbitrary {@link RawComparator}. */
2599    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2600                  Class valClass, Configuration conf) {
2601      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2602    }
2603
2604    /** Sort and merge using an arbitrary {@link RawComparator}. */
2605    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2606                  Class valClass, Configuration conf, Metadata metadata) {
2607      this.fs = fs;
2608      this.comparator = comparator;
2609      this.keyClass = keyClass;
2610      this.valClass = valClass;
2611      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2612      this.factor = conf.getInt("io.sort.factor", 100);
2613      this.conf = conf;
2614      this.metadata = metadata;
2615    }
2616
2617    /** Set the number of streams to merge at once.*/
2618    public void setFactor(int factor) { this.factor = factor; }
2619
2620    /** Get the number of streams to merge at once.*/
2621    public int getFactor() { return factor; }
2622
2623    /** Set the total amount of buffer memory, in bytes.*/
2624    public void setMemory(int memory) { this.memory = memory; }
2625
2626    /** Get the total amount of buffer memory, in bytes.*/
2627    public int getMemory() { return memory; }
2628
2629    /** Set the progressable object in order to report progress. */
2630    public void setProgressable(Progressable progressable) {
2631      this.progressable = progressable;
2632    }
2633    
2634    /** 
2635     * Perform a file sort from a set of input files into an output file.
2636     * @param inFiles the files to be sorted
2637     * @param outFile the sorted output file
2638     * @param deleteInput should the input files be deleted as they are read?
2639     */
2640    public void sort(Path[] inFiles, Path outFile,
2641                     boolean deleteInput) throws IOException {
2642      if (fs.exists(outFile)) {
2643        throw new IOException("already exists: " + outFile);
2644      }
2645
2646      this.inFiles = inFiles;
2647      this.outFile = outFile;
2648
2649      int segments = sortPass(deleteInput);
2650      if (segments > 1) {
2651        mergePass(outFile.getParent());
2652      }
2653    }
2654
2655    /** 
2656     * Perform a file sort from a set of input files and return an iterator.
2657     * @param inFiles the files to be sorted
2658     * @param tempDir the directory where temp files are created during sort
2659     * @param deleteInput should the input files be deleted as they are read?
2660     * @return iterator the RawKeyValueIterator
2661     */
2662    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2663                                              boolean deleteInput) throws IOException {
2664      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2665      if (fs.exists(outFile)) {
2666        throw new IOException("already exists: " + outFile);
2667      }
2668      this.inFiles = inFiles;
2669      //outFile will basically be used as prefix for temp files in the cases
2670      //where sort outputs multiple sorted segments. For the single segment
2671      //case, the outputFile itself will contain the sorted data for that
2672      //segment
2673      this.outFile = outFile;
2674
2675      int segments = sortPass(deleteInput);
2676      if (segments > 1)
2677        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2678                     tempDir);
2679      else if (segments == 1)
2680        return merge(new Path[]{outFile}, true, tempDir);
2681      else return null;
2682    }
2683
2684    /**
2685     * The backwards compatible interface to sort.
2686     * @param inFile the input file to sort
2687     * @param outFile the sorted output file
2688     */
2689    public void sort(Path inFile, Path outFile) throws IOException {
2690      sort(new Path[]{inFile}, outFile, false);
2691    }
2692    
2693    private int sortPass(boolean deleteInput) throws IOException {
2694      if(LOG.isDebugEnabled()) {
2695        LOG.debug("running sort pass");
2696      }
2697      SortPass sortPass = new SortPass();         // make the SortPass
2698      sortPass.setProgressable(progressable);
2699      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2700      try {
2701        return sortPass.run(deleteInput);         // run it
2702      } finally {
2703        sortPass.close();                         // close it
2704      }
2705    }
2706
2707    private class SortPass {
2708      private int memoryLimit = memory/4;
2709      private int recordLimit = 1000000;
2710      
2711      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2712      private byte[] rawBuffer;
2713
2714      private int[] keyOffsets = new int[1024];
2715      private int[] pointers = new int[keyOffsets.length];
2716      private int[] pointersCopy = new int[keyOffsets.length];
2717      private int[] keyLengths = new int[keyOffsets.length];
2718      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2719      
2720      private ArrayList segmentLengths = new ArrayList();
2721      
2722      private Reader in = null;
2723      private FSDataOutputStream out = null;
2724      private FSDataOutputStream indexOut = null;
2725      private Path outName;
2726
2727      private Progressable progressable = null;
2728
2729      public int run(boolean deleteInput) throws IOException {
2730        int segments = 0;
2731        int currentFile = 0;
2732        boolean atEof = (currentFile >= inFiles.length);
2733        CompressionType compressionType;
2734        CompressionCodec codec = null;
2735        segmentLengths.clear();
2736        if (atEof) {
2737          return 0;
2738        }
2739        
2740        // Initialize
2741        in = new Reader(fs, inFiles[currentFile], conf);
2742        compressionType = in.getCompressionType();
2743        codec = in.getCompressionCodec();
2744        
2745        for (int i=0; i < rawValues.length; ++i) {
2746          rawValues[i] = null;
2747        }
2748        
2749        while (!atEof) {
2750          int count = 0;
2751          int bytesProcessed = 0;
2752          rawKeys.reset();
2753          while (!atEof && 
2754                 bytesProcessed < memoryLimit && count < recordLimit) {
2755
2756            // Read a record into buffer
2757            // Note: Attempt to re-use 'rawValue' as far as possible
2758            int keyOffset = rawKeys.getLength();       
2759            ValueBytes rawValue = 
2760              (count == keyOffsets.length || rawValues[count] == null) ? 
2761              in.createValueBytes() : 
2762              rawValues[count];
2763            int recordLength = in.nextRaw(rawKeys, rawValue);
2764            if (recordLength == -1) {
2765              in.close();
2766              if (deleteInput) {
2767                fs.delete(inFiles[currentFile], true);
2768              }
2769              currentFile += 1;
2770              atEof = currentFile >= inFiles.length;
2771              if (!atEof) {
2772                in = new Reader(fs, inFiles[currentFile], conf);
2773              } else {
2774                in = null;
2775              }
2776              continue;
2777            }
2778
2779            int keyLength = rawKeys.getLength() - keyOffset;
2780
2781            if (count == keyOffsets.length)
2782              grow();
2783
2784            keyOffsets[count] = keyOffset;                // update pointers
2785            pointers[count] = count;
2786            keyLengths[count] = keyLength;
2787            rawValues[count] = rawValue;
2788
2789            bytesProcessed += recordLength; 
2790            count++;
2791          }
2792
2793          // buffer is full -- sort & flush it
2794          if(LOG.isDebugEnabled()) {
2795            LOG.debug("flushing segment " + segments);
2796          }
2797          rawBuffer = rawKeys.getData();
2798          sort(count);
2799          // indicate we're making progress
2800          if (progressable != null) {
2801            progressable.progress();
2802          }
2803          flush(count, bytesProcessed, compressionType, codec, 
2804                segments==0 && atEof);
2805          segments++;
2806        }
2807        return segments;
2808      }
2809
2810      public void close() throws IOException {
2811        if (in != null) {
2812          in.close();
2813        }
2814        if (out != null) {
2815          out.close();
2816        }
2817        if (indexOut != null) {
2818          indexOut.close();
2819        }
2820      }
2821
2822      private void grow() {
2823        int newLength = keyOffsets.length * 3 / 2;
2824        keyOffsets = grow(keyOffsets, newLength);
2825        pointers = grow(pointers, newLength);
2826        pointersCopy = new int[newLength];
2827        keyLengths = grow(keyLengths, newLength);
2828        rawValues = grow(rawValues, newLength);
2829      }
2830
2831      private int[] grow(int[] old, int newLength) {
2832        int[] result = new int[newLength];
2833        System.arraycopy(old, 0, result, 0, old.length);
2834        return result;
2835      }
2836      
2837      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2838        ValueBytes[] result = new ValueBytes[newLength];
2839        System.arraycopy(old, 0, result, 0, old.length);
2840        for (int i=old.length; i < newLength; ++i) {
2841          result[i] = null;
2842        }
2843        return result;
2844      }
2845
2846      private void flush(int count, int bytesProcessed, 
2847                         CompressionType compressionType, 
2848                         CompressionCodec codec, 
2849                         boolean done) throws IOException {
2850        if (out == null) {
2851          outName = done ? outFile : outFile.suffix(".0");
2852          out = fs.create(outName);
2853          if (!done) {
2854            indexOut = fs.create(outName.suffix(".index"));
2855          }
2856        }
2857
2858        long segmentStart = out.getPos();
2859        Writer writer = createWriter(conf, Writer.stream(out), 
2860            Writer.keyClass(keyClass), Writer.valueClass(valClass),
2861            Writer.compression(compressionType, codec),
2862            Writer.metadata(done ? metadata : new Metadata()));
2863        
2864        if (!done) {
2865          writer.sync = null;                     // disable sync on temp files
2866        }
2867
2868        for (int i = 0; i < count; i++) {         // write in sorted order
2869          int p = pointers[i];
2870          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2871        }
2872        writer.close();
2873        
2874        if (!done) {
2875          // Save the segment length
2876          WritableUtils.writeVLong(indexOut, segmentStart);
2877          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2878          indexOut.flush();
2879        }
2880      }
2881
2882      private void sort(int count) {
2883        System.arraycopy(pointers, 0, pointersCopy, 0, count);
2884        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2885      }
2886      class SeqFileComparator implements Comparator<IntWritable> {
2887        public int compare(IntWritable I, IntWritable J) {
2888          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2889                                    keyLengths[I.get()], rawBuffer, 
2890                                    keyOffsets[J.get()], keyLengths[J.get()]);
2891        }
2892      }
2893      
2894      /** set the progressable object in order to report progress */
2895      public void setProgressable(Progressable progressable)
2896      {
2897        this.progressable = progressable;
2898      }
2899      
2900    } // SequenceFile.Sorter.SortPass
2901
2902    /** The interface to iterate over raw keys/values of SequenceFiles. */
2903    public static interface RawKeyValueIterator {
2904      /** Gets the current raw key
2905       * @return DataOutputBuffer
2906       * @throws IOException
2907       */
2908      DataOutputBuffer getKey() throws IOException; 
2909      /** Gets the current raw value
2910       * @return ValueBytes 
2911       * @throws IOException
2912       */
2913      ValueBytes getValue() throws IOException; 
2914      /** Sets up the current key and value (for getKey and getValue)
2915       * @return true if there exists a key/value, false otherwise 
2916       * @throws IOException
2917       */
2918      boolean next() throws IOException;
2919      /** closes the iterator so that the underlying streams can be closed
2920       * @throws IOException
2921       */
2922      void close() throws IOException;
2923      /** Gets the Progress object; this has a float (0.0 - 1.0) 
2924       * indicating the bytes processed by the iterator so far
2925       */
2926      Progress getProgress();
2927    }    
2928    
2929    /**
2930     * Merges the list of segments of type <code>SegmentDescriptor</code>
2931     * @param segments the list of SegmentDescriptors
2932     * @param tmpDir the directory to write temporary files into
2933     * @return RawKeyValueIterator
2934     * @throws IOException
2935     */
2936    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
2937                                     Path tmpDir) 
2938      throws IOException {
2939      // pass in object to report progress, if present
2940      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
2941      return mQueue.merge();
2942    }
2943
2944    /**
2945     * Merges the contents of files passed in Path[] using a max factor value
2946     * that is already set
2947     * @param inNames the array of path names
2948     * @param deleteInputs true if the input files should be deleted when 
2949     * unnecessary
2950     * @param tmpDir the directory to write temporary files into
2951     * @return RawKeyValueIteratorMergeQueue
2952     * @throws IOException
2953     */
2954    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2955                                     Path tmpDir) 
2956      throws IOException {
2957      return merge(inNames, deleteInputs, 
2958                   (inNames.length < factor) ? inNames.length : factor,
2959                   tmpDir);
2960    }
2961
2962    /**
2963     * Merges the contents of files passed in Path[]
2964     * @param inNames the array of path names
2965     * @param deleteInputs true if the input files should be deleted when 
2966     * unnecessary
2967     * @param factor the factor that will be used as the maximum merge fan-in
2968     * @param tmpDir the directory to write temporary files into
2969     * @return RawKeyValueIteratorMergeQueue
2970     * @throws IOException
2971     */
2972    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
2973                                     int factor, Path tmpDir) 
2974      throws IOException {
2975      //get the segments from inNames
2976      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
2977      for (int i = 0; i < inNames.length; i++) {
2978        SegmentDescriptor s = new SegmentDescriptor(0,
2979            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
2980        s.preserveInput(!deleteInputs);
2981        s.doSync();
2982        a.add(s);
2983      }
2984      this.factor = factor;
2985      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
2986      return mQueue.merge();
2987    }
2988
2989    /**
2990     * Merges the contents of files passed in Path[]
2991     * @param inNames the array of path names
2992     * @param tempDir the directory for creating temp files during merge
2993     * @param deleteInputs true if the input files should be deleted when 
2994     * unnecessary
2995     * @return RawKeyValueIteratorMergeQueue
2996     * @throws IOException
2997     */
2998    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
2999                                     boolean deleteInputs) 
3000      throws IOException {
3001      //outFile will basically be used as prefix for temp files for the
3002      //intermediate merge outputs           
3003      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3004      //get the segments from inNames
3005      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3006      for (int i = 0; i < inNames.length; i++) {
3007        SegmentDescriptor s = new SegmentDescriptor(0,
3008            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3009        s.preserveInput(!deleteInputs);
3010        s.doSync();
3011        a.add(s);
3012      }
3013      factor = (inNames.length < factor) ? inNames.length : factor;
3014      // pass in object to report progress, if present
3015      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3016      return mQueue.merge();
3017    }
3018
3019    /**
3020     * Clones the attributes (like compression of the input file and creates a 
3021     * corresponding Writer
3022     * @param inputFile the path of the input file whose attributes should be 
3023     * cloned
3024     * @param outputFile the path of the output file 
3025     * @param prog the Progressable to report status during the file write
3026     * @return Writer
3027     * @throws IOException
3028     */
3029    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3030                                      Progressable prog) throws IOException {
3031      Reader reader = new Reader(conf,
3032                                 Reader.file(inputFile),
3033                                 new Reader.OnlyHeaderOption());
3034      CompressionType compress = reader.getCompressionType();
3035      CompressionCodec codec = reader.getCompressionCodec();
3036      reader.close();
3037
3038      Writer writer = createWriter(conf, 
3039                                   Writer.file(outputFile), 
3040                                   Writer.keyClass(keyClass), 
3041                                   Writer.valueClass(valClass), 
3042                                   Writer.compression(compress, codec), 
3043                                   Writer.progressable(prog));
3044      return writer;
3045    }
3046
3047    /**
3048     * Writes records from RawKeyValueIterator into a file represented by the 
3049     * passed writer
3050     * @param records the RawKeyValueIterator
3051     * @param writer the Writer created earlier 
3052     * @throws IOException
3053     */
3054    public void writeFile(RawKeyValueIterator records, Writer writer) 
3055      throws IOException {
3056      while(records.next()) {
3057        writer.appendRaw(records.getKey().getData(), 0, 
3058                         records.getKey().getLength(), records.getValue());
3059      }
3060      writer.sync();
3061    }
3062        
3063    /** Merge the provided files.
3064     * @param inFiles the array of input path names
3065     * @param outFile the final output file
3066     * @throws IOException
3067     */
3068    public void merge(Path[] inFiles, Path outFile) throws IOException {
3069      if (fs.exists(outFile)) {
3070        throw new IOException("already exists: " + outFile);
3071      }
3072      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3073      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3074      
3075      writeFile(r, writer);
3076
3077      writer.close();
3078    }
3079
3080    /** sort calls this to generate the final merged output */
3081    private int mergePass(Path tmpDir) throws IOException {
3082      if(LOG.isDebugEnabled()) {
3083        LOG.debug("running merge pass");
3084      }
3085      Writer writer = cloneFileAttributes(
3086                                          outFile.suffix(".0"), outFile, null);
3087      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3088                                    outFile.suffix(".0.index"), tmpDir);
3089      writeFile(r, writer);
3090
3091      writer.close();
3092      return 0;
3093    }
3094
3095    /** Used by mergePass to merge the output of the sort
3096     * @param inName the name of the input file containing sorted segments
3097     * @param indexIn the offsets of the sorted segments
3098     * @param tmpDir the relative directory to store intermediate results in
3099     * @return RawKeyValueIterator
3100     * @throws IOException
3101     */
3102    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3103      throws IOException {
3104      //get the segments from indexIn
3105      //we create a SegmentContainer so that we can track segments belonging to
3106      //inName and delete inName as soon as we see that we have looked at all
3107      //the contained segments during the merge process & hence don't need 
3108      //them anymore
3109      SegmentContainer container = new SegmentContainer(inName, indexIn);
3110      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3111      return mQueue.merge();
3112    }
3113    
3114    /** This class implements the core of the merge logic */
3115    private class MergeQueue extends PriorityQueue 
3116      implements RawKeyValueIterator {
3117      private boolean compress;
3118      private boolean blockCompress;
3119      private DataOutputBuffer rawKey = new DataOutputBuffer();
3120      private ValueBytes rawValue;
3121      private long totalBytesProcessed;
3122      private float progPerByte;
3123      private Progress mergeProgress = new Progress();
3124      private Path tmpDir;
3125      private Progressable progress = null; //handle to the progress reporting object
3126      private SegmentDescriptor minSegment;
3127      
3128      //a TreeMap used to store the segments sorted by size (segment offset and
3129      //segment path name is used to break ties between segments of same sizes)
3130      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3131        new TreeMap<SegmentDescriptor, Void>();
3132            
3133      @SuppressWarnings("unchecked")
3134      public void put(SegmentDescriptor stream) throws IOException {
3135        if (size() == 0) {
3136          compress = stream.in.isCompressed();
3137          blockCompress = stream.in.isBlockCompressed();
3138        } else if (compress != stream.in.isCompressed() || 
3139                   blockCompress != stream.in.isBlockCompressed()) {
3140          throw new IOException("All merged files must be compressed or not.");
3141        } 
3142        super.put(stream);
3143      }
3144      
3145      /**
3146       * A queue of file segments to merge
3147       * @param segments the file segments to merge
3148       * @param tmpDir a relative local directory to save intermediate files in
3149       * @param progress the reference to the Progressable object
3150       */
3151      public MergeQueue(List <SegmentDescriptor> segments,
3152          Path tmpDir, Progressable progress) {
3153        int size = segments.size();
3154        for (int i = 0; i < size; i++) {
3155          sortedSegmentSizes.put(segments.get(i), null);
3156        }
3157        this.tmpDir = tmpDir;
3158        this.progress = progress;
3159      }
3160      protected boolean lessThan(Object a, Object b) {
3161        // indicate we're making progress
3162        if (progress != null) {
3163          progress.progress();
3164        }
3165        SegmentDescriptor msa = (SegmentDescriptor)a;
3166        SegmentDescriptor msb = (SegmentDescriptor)b;
3167        return comparator.compare(msa.getKey().getData(), 0, 
3168                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3169                                  msb.getKey().getLength()) < 0;
3170      }
3171      public void close() throws IOException {
3172        SegmentDescriptor ms;                           // close inputs
3173        while ((ms = (SegmentDescriptor)pop()) != null) {
3174          ms.cleanup();
3175        }
3176        minSegment = null;
3177      }
3178      public DataOutputBuffer getKey() throws IOException {
3179        return rawKey;
3180      }
3181      public ValueBytes getValue() throws IOException {
3182        return rawValue;
3183      }
3184      public boolean next() throws IOException {
3185        if (size() == 0)
3186          return false;
3187        if (minSegment != null) {
3188          //minSegment is non-null for all invocations of next except the first
3189          //one. For the first invocation, the priority queue is ready for use
3190          //but for the subsequent invocations, first adjust the queue 
3191          adjustPriorityQueue(minSegment);
3192          if (size() == 0) {
3193            minSegment = null;
3194            return false;
3195          }
3196        }
3197        minSegment = (SegmentDescriptor)top();
3198        long startPos = minSegment.in.getPosition(); // Current position in stream
3199        //save the raw key reference
3200        rawKey = minSegment.getKey();
3201        //load the raw value. Re-use the existing rawValue buffer
3202        if (rawValue == null) {
3203          rawValue = minSegment.in.createValueBytes();
3204        }
3205        minSegment.nextRawValue(rawValue);
3206        long endPos = minSegment.in.getPosition(); // End position after reading value
3207        updateProgress(endPos - startPos);
3208        return true;
3209      }
3210      
3211      public Progress getProgress() {
3212        return mergeProgress; 
3213      }
3214
3215      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3216        long startPos = ms.in.getPosition(); // Current position in stream
3217        boolean hasNext = ms.nextRawKey();
3218        long endPos = ms.in.getPosition(); // End position after reading key
3219        updateProgress(endPos - startPos);
3220        if (hasNext) {
3221          adjustTop();
3222        } else {
3223          pop();
3224          ms.cleanup();
3225        }
3226      }
3227
3228      private void updateProgress(long bytesProcessed) {
3229        totalBytesProcessed += bytesProcessed;
3230        if (progPerByte > 0) {
3231          mergeProgress.set(totalBytesProcessed * progPerByte);
3232        }
3233      }
3234      
3235      /** This is the single level merge that is called multiple times 
3236       * depending on the factor size and the number of segments
3237       * @return RawKeyValueIterator
3238       * @throws IOException
3239       */
3240      public RawKeyValueIterator merge() throws IOException {
3241        //create the MergeStreams from the sorted map created in the constructor
3242        //and dump the final output to a file
3243        int numSegments = sortedSegmentSizes.size();
3244        int origFactor = factor;
3245        int passNo = 1;
3246        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3247        do {
3248          //get the factor for this pass of merge
3249          factor = getPassFactor(passNo, numSegments);
3250          List<SegmentDescriptor> segmentsToMerge =
3251            new ArrayList<SegmentDescriptor>();
3252          int segmentsConsidered = 0;
3253          int numSegmentsToConsider = factor;
3254          while (true) {
3255            //extract the smallest 'factor' number of segment pointers from the 
3256            //TreeMap. Call cleanup on the empty segments (no key/value data)
3257            SegmentDescriptor[] mStream = 
3258              getSegmentDescriptors(numSegmentsToConsider);
3259            for (int i = 0; i < mStream.length; i++) {
3260              if (mStream[i].nextRawKey()) {
3261                segmentsToMerge.add(mStream[i]);
3262                segmentsConsidered++;
3263                // Count the fact that we read some bytes in calling nextRawKey()
3264                updateProgress(mStream[i].in.getPosition());
3265              }
3266              else {
3267                mStream[i].cleanup();
3268                numSegments--; //we ignore this segment for the merge
3269              }
3270            }
3271            //if we have the desired number of segments
3272            //or looked at all available segments, we break
3273            if (segmentsConsidered == factor || 
3274                sortedSegmentSizes.size() == 0) {
3275              break;
3276            }
3277              
3278            numSegmentsToConsider = factor - segmentsConsidered;
3279          }
3280          //feed the streams to the priority queue
3281          initialize(segmentsToMerge.size()); clear();
3282          for (int i = 0; i < segmentsToMerge.size(); i++) {
3283            put(segmentsToMerge.get(i));
3284          }
3285          //if we have lesser number of segments remaining, then just return the
3286          //iterator, else do another single level merge
3287          if (numSegments <= factor) {
3288            //calculate the length of the remaining segments. Required for 
3289            //calculating the merge progress
3290            long totalBytes = 0;
3291            for (int i = 0; i < segmentsToMerge.size(); i++) {
3292              totalBytes += segmentsToMerge.get(i).segmentLength;
3293            }
3294            if (totalBytes != 0) //being paranoid
3295              progPerByte = 1.0f / (float)totalBytes;
3296            //reset factor to what it originally was
3297            factor = origFactor;
3298            return this;
3299          } else {
3300            //we want to spread the creation of temp files on multiple disks if 
3301            //available under the space constraints
3302            long approxOutputSize = 0; 
3303            for (SegmentDescriptor s : segmentsToMerge) {
3304              approxOutputSize += s.segmentLength + 
3305                                  ChecksumFileSystem.getApproxChkSumLength(
3306                                  s.segmentLength);
3307            }
3308            Path tmpFilename = 
3309              new Path(tmpDir, "intermediate").suffix("." + passNo);
3310
3311            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3312                                                tmpFilename.toString(),
3313                                                approxOutputSize, conf);
3314            if(LOG.isDebugEnabled()) { 
3315              LOG.debug("writing intermediate results to " + outputFile);
3316            }
3317            Writer writer = cloneFileAttributes(
3318                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3319                                                fs.makeQualified(outputFile), null);
3320            writer.sync = null; //disable sync for temp files
3321            writeFile(this, writer);
3322            writer.close();
3323            
3324            //we finished one single level merge; now clean up the priority 
3325            //queue
3326            this.close();
3327            
3328            SegmentDescriptor tempSegment = 
3329              new SegmentDescriptor(0,
3330                  fs.getFileStatus(outputFile).getLen(), outputFile);
3331            //put the segment back in the TreeMap
3332            sortedSegmentSizes.put(tempSegment, null);
3333            numSegments = sortedSegmentSizes.size();
3334            passNo++;
3335          }
3336          //we are worried about only the first pass merge factor. So reset the 
3337          //factor to what it originally was
3338          factor = origFactor;
3339        } while(true);
3340      }
3341  
3342      //Hadoop-591
3343      public int getPassFactor(int passNo, int numSegments) {
3344        if (passNo > 1 || numSegments <= factor || factor == 1) 
3345          return factor;
3346        int mod = (numSegments - 1) % (factor - 1);
3347        if (mod == 0)
3348          return factor;
3349        return mod + 1;
3350      }
3351      
3352      /** Return (& remove) the requested number of segment descriptors from the
3353       * sorted map.
3354       */
3355      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3356        if (numDescriptors > sortedSegmentSizes.size())
3357          numDescriptors = sortedSegmentSizes.size();
3358        SegmentDescriptor[] SegmentDescriptors = 
3359          new SegmentDescriptor[numDescriptors];
3360        Iterator iter = sortedSegmentSizes.keySet().iterator();
3361        int i = 0;
3362        while (i < numDescriptors) {
3363          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3364          iter.remove();
3365        }
3366        return SegmentDescriptors;
3367      }
3368    } // SequenceFile.Sorter.MergeQueue
3369
3370    /** This class defines a merge segment. This class can be subclassed to 
3371     * provide a customized cleanup method implementation. In this 
3372     * implementation, cleanup closes the file handle and deletes the file 
3373     */
3374    public class SegmentDescriptor implements Comparable {
3375      
3376      long segmentOffset; //the start of the segment in the file
3377      long segmentLength; //the length of the segment
3378      Path segmentPathName; //the path name of the file containing the segment
3379      boolean ignoreSync = true; //set to true for temp files
3380      private Reader in = null; 
3381      private DataOutputBuffer rawKey = null; //this will hold the current key
3382      private boolean preserveInput = false; //delete input segment files?
3383      
3384      /** Constructs a segment
3385       * @param segmentOffset the offset of the segment in the file
3386       * @param segmentLength the length of the segment
3387       * @param segmentPathName the path name of the file containing the segment
3388       */
3389      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3390                                Path segmentPathName) {
3391        this.segmentOffset = segmentOffset;
3392        this.segmentLength = segmentLength;
3393        this.segmentPathName = segmentPathName;
3394      }
3395      
3396      /** Do the sync checks */
3397      public void doSync() {ignoreSync = false;}
3398      
3399      /** Whether to delete the files when no longer needed */
3400      public void preserveInput(boolean preserve) {
3401        preserveInput = preserve;
3402      }
3403
3404      public boolean shouldPreserveInput() {
3405        return preserveInput;
3406      }
3407      
3408      public int compareTo(Object o) {
3409        SegmentDescriptor that = (SegmentDescriptor)o;
3410        if (this.segmentLength != that.segmentLength) {
3411          return (this.segmentLength < that.segmentLength ? -1 : 1);
3412        }
3413        if (this.segmentOffset != that.segmentOffset) {
3414          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3415        }
3416        return (this.segmentPathName.toString()).
3417          compareTo(that.segmentPathName.toString());
3418      }
3419
3420      public boolean equals(Object o) {
3421        if (!(o instanceof SegmentDescriptor)) {
3422          return false;
3423        }
3424        SegmentDescriptor that = (SegmentDescriptor)o;
3425        if (this.segmentLength == that.segmentLength &&
3426            this.segmentOffset == that.segmentOffset &&
3427            this.segmentPathName.toString().equals(
3428              that.segmentPathName.toString())) {
3429          return true;
3430        }
3431        return false;
3432      }
3433
3434      public int hashCode() {
3435        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3436      }
3437
3438      /** Fills up the rawKey object with the key returned by the Reader
3439       * @return true if there is a key returned; false, otherwise
3440       * @throws IOException
3441       */
3442      public boolean nextRawKey() throws IOException {
3443        if (in == null) {
3444          int bufferSize = getBufferSize(conf); 
3445          Reader reader = new Reader(conf,
3446                                     Reader.file(segmentPathName), 
3447                                     Reader.bufferSize(bufferSize),
3448                                     Reader.start(segmentOffset), 
3449                                     Reader.length(segmentLength));
3450        
3451          //sometimes we ignore syncs especially for temp merge files
3452          if (ignoreSync) reader.ignoreSync();
3453
3454          if (reader.getKeyClass() != keyClass)
3455            throw new IOException("wrong key class: " + reader.getKeyClass() +
3456                                  " is not " + keyClass);
3457          if (reader.getValueClass() != valClass)
3458            throw new IOException("wrong value class: "+reader.getValueClass()+
3459                                  " is not " + valClass);
3460          this.in = reader;
3461          rawKey = new DataOutputBuffer();
3462        }
3463        rawKey.reset();
3464        int keyLength = 
3465          in.nextRawKey(rawKey);
3466        return (keyLength >= 0);
3467      }
3468
3469      /** Fills up the passed rawValue with the value corresponding to the key
3470       * read earlier
3471       * @param rawValue
3472       * @return the length of the value
3473       * @throws IOException
3474       */
3475      public int nextRawValue(ValueBytes rawValue) throws IOException {
3476        int valLength = in.nextRawValue(rawValue);
3477        return valLength;
3478      }
3479      
3480      /** Returns the stored rawKey */
3481      public DataOutputBuffer getKey() {
3482        return rawKey;
3483      }
3484      
3485      /** closes the underlying reader */
3486      private void close() throws IOException {
3487        this.in.close();
3488        this.in = null;
3489      }
3490
3491      /** The default cleanup. Subclasses can override this with a custom 
3492       * cleanup 
3493       */
3494      public void cleanup() throws IOException {
3495        close();
3496        if (!preserveInput) {
3497          fs.delete(segmentPathName, true);
3498        }
3499      }
3500    } // SequenceFile.Sorter.SegmentDescriptor
3501    
3502    /** This class provisions multiple segments contained within a single
3503     *  file
3504     */
3505    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3506
3507      SegmentContainer parentContainer = null;
3508
3509      /** Constructs a segment
3510       * @param segmentOffset the offset of the segment in the file
3511       * @param segmentLength the length of the segment
3512       * @param segmentPathName the path name of the file containing the segment
3513       * @param parent the parent SegmentContainer that holds the segment
3514       */
3515      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3516                                       Path segmentPathName, SegmentContainer parent) {
3517        super(segmentOffset, segmentLength, segmentPathName);
3518        this.parentContainer = parent;
3519      }
3520      /** The default cleanup. Subclasses can override this with a custom 
3521       * cleanup 
3522       */
3523      public void cleanup() throws IOException {
3524        super.close();
3525        if (super.shouldPreserveInput()) return;
3526        parentContainer.cleanup();
3527      }
3528      
3529      public boolean equals(Object o) {
3530        if (!(o instanceof LinkedSegmentsDescriptor)) {
3531          return false;
3532        }
3533        return super.equals(o);
3534      }
3535    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3536
3537    /** The class that defines a container for segments to be merged. Primarily
3538     * required to delete temp files as soon as all the contained segments
3539     * have been looked at */
3540    private class SegmentContainer {
3541      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3542      private int numSegmentsContained; //# of segments contained
3543      private Path inName; //input file from where segments are created
3544      
3545      //the list of segments read from the file
3546      private ArrayList <SegmentDescriptor> segments = 
3547        new ArrayList <SegmentDescriptor>();
3548      /** This constructor is there primarily to serve the sort routine that 
3549       * generates a single output file with an associated index file */
3550      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3551        //get the segments from indexIn
3552        FSDataInputStream fsIndexIn = fs.open(indexIn);
3553        long end = fs.getFileStatus(indexIn).getLen();
3554        while (fsIndexIn.getPos() < end) {
3555          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3556          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3557          Path segmentName = inName;
3558          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3559                                                    segmentLength, segmentName, this));
3560        }
3561        fsIndexIn.close();
3562        fs.delete(indexIn, true);
3563        numSegmentsContained = segments.size();
3564        this.inName = inName;
3565      }
3566
3567      public List <SegmentDescriptor> getSegmentList() {
3568        return segments;
3569      }
3570      public void cleanup() throws IOException {
3571        numSegmentsCleanedUp++;
3572        if (numSegmentsCleanedUp == numSegmentsContained) {
3573          fs.delete(inName, true);
3574        }
3575      }
3576    } //SequenceFile.Sorter.SegmentContainer
3577
3578  } // SequenceFile.Sorter
3579
3580} // SequenceFile