001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.nio.charset.StandardCharsets;
023import java.util.*;
024import java.rmi.server.UID;
025import java.security.MessageDigest;
026
027import org.apache.commons.logging.*;
028import org.apache.hadoop.util.Options;
029import org.apache.hadoop.fs.*;
030import org.apache.hadoop.fs.Options.CreateOpts;
031import org.apache.hadoop.io.compress.CodecPool;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.hadoop.io.compress.DefaultCodec;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.io.compress.zlib.ZlibFactory;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.Serializer;
042import org.apache.hadoop.io.serializer.SerializationFactory;
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.*;
046import org.apache.hadoop.util.Progressable;
047import org.apache.hadoop.util.Progress;
048import org.apache.hadoop.util.ReflectionUtils;
049import org.apache.hadoop.util.NativeCodeLoader;
050import org.apache.hadoop.util.MergeSort;
051import org.apache.hadoop.util.PriorityQueue;
052import org.apache.hadoop.util.Time;
053
054import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
055import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
056import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT;
057import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY;
058import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_DEFAULT;
059import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_SKIP_CHECKSUM_ERRORS_KEY;
060
061/** 
062 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
063 * pairs.
064 * 
065 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
066 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
067 * reading and sorting respectively.</p>
068 * 
069 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
070 * {@link CompressionType} used to compress key/value pairs:
071 * <ol>
072 *   <li>
073 *   <code>Writer</code> : Uncompressed records.
074 *   </li>
075 *   <li>
076 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
077 *                                       values.
078 *   </li>
079 *   <li>
080 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
081 *                                      values are collected in 'blocks' 
082 *                                      separately and compressed. The size of 
083 *                                      the 'block' is configurable.
084 * </ol>
085 * 
086 * <p>The actual compression algorithm used to compress key and/or values can be
087 * specified by using the appropriate {@link CompressionCodec}.</p>
088 * 
089 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
090 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
091 *
092 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
093 * above <code>SequenceFile</code> formats.</p>
094 *
095 * <h4 id="Formats">SequenceFile Formats</h4>
096 * 
097 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
098 * depending on the <code>CompressionType</code> specified. All of them share a
099 * <a href="#Header">common header</a> described below.
100 * 
101 * <h5 id="Header">SequenceFile Header</h5>
102 * <ul>
103 *   <li>
104 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
105 *             version number (e.g. SEQ4 or SEQ6)
106 *   </li>
107 *   <li>
108 *   keyClassName -key class
109 *   </li>
110 *   <li>
111 *   valueClassName - value class
112 *   </li>
113 *   <li>
114 *   compression - A boolean which specifies if compression is turned on for 
115 *                 keys/values in this file.
116 *   </li>
117 *   <li>
118 *   blockCompression - A boolean which specifies if block-compression is 
119 *                      turned on for keys/values in this file.
120 *   </li>
121 *   <li>
122 *   compression codec - <code>CompressionCodec</code> class which is used for  
123 *                       compression of keys and/or values (if compression is 
124 *                       enabled).
125 *   </li>
126 *   <li>
127 *   metadata - {@link Metadata} for this file.
128 *   </li>
129 *   <li>
130 *   sync - A sync marker to denote end of the header.
131 *   </li>
132 * </ul>
133 * 
134 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
135 * <ul>
136 * <li>
137 * <a href="#Header">Header</a>
138 * </li>
139 * <li>
140 * Record
141 *   <ul>
142 *     <li>Record length</li>
143 *     <li>Key length</li>
144 *     <li>Key</li>
145 *     <li>Value</li>
146 *   </ul>
147 * </li>
148 * <li>
149 * A sync-marker every few <code>100</code> bytes or so.
150 * </li>
151 * </ul>
152 *
153 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
154 * <ul>
155 * <li>
156 * <a href="#Header">Header</a>
157 * </li>
158 * <li>
159 * Record
160 *   <ul>
161 *     <li>Record length</li>
162 *     <li>Key length</li>
163 *     <li>Key</li>
164 *     <li><i>Compressed</i> Value</li>
165 *   </ul>
166 * </li>
167 * <li>
168 * A sync-marker every few <code>100</code> bytes or so.
169 * </li>
170 * </ul>
171 * 
172 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
173 * <ul>
174 * <li>
175 * <a href="#Header">Header</a>
176 * </li>
177 * <li>
178 * Record <i>Block</i>
179 *   <ul>
180 *     <li>Uncompressed number of records in the block</li>
181 *     <li>Compressed key-lengths block-size</li>
182 *     <li>Compressed key-lengths block</li>
183 *     <li>Compressed keys block-size</li>
184 *     <li>Compressed keys block</li>
185 *     <li>Compressed value-lengths block-size</li>
186 *     <li>Compressed value-lengths block</li>
187 *     <li>Compressed values block-size</li>
188 *     <li>Compressed values block</li>
189 *   </ul>
190 * </li>
191 * <li>
192 * A sync-marker every block.
193 * </li>
194 * </ul>
195 * 
196 * <p>The compressed blocks of key lengths and value lengths consist of the 
197 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
198 * format.</p>
199 * 
200 * @see CompressionCodec
201 */
202@InterfaceAudience.Public
203@InterfaceStability.Stable
204public class SequenceFile {
205  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
206
207  private SequenceFile() {}                         // no public ctor
208
209  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
210  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
211  private static final byte VERSION_WITH_METADATA = (byte)6;
212  private static byte[] VERSION = new byte[] {
213    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
214  };
215
216  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
217  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
218  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
219
220  /** The number of bytes between sync points.*/
221  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
222
223  /** 
224   * The compression type used to compress key/value pairs in the 
225   * {@link SequenceFile}.
226   * 
227   * @see SequenceFile.Writer
228   */
229  public static enum CompressionType {
230    /** Do not compress records. */
231    NONE, 
232    /** Compress values only, each separately. */
233    RECORD,
234    /** Compress sequences of records together in blocks. */
235    BLOCK
236  }
237
238  /**
239   * Get the compression type for the reduce outputs
240   * @param job the job config to look in
241   * @return the kind of compression to use
242   */
243  static public CompressionType getDefaultCompressionType(Configuration job) {
244    String name = job.get("io.seqfile.compression.type");
245    return name == null ? CompressionType.RECORD : 
246      CompressionType.valueOf(name);
247  }
248  
249  /**
250   * Set the default compression type for sequence files.
251   * @param job the configuration to modify
252   * @param val the new compression type (none, block, record)
253   */
254  static public void setDefaultCompressionType(Configuration job, 
255                                               CompressionType val) {
256    job.set("io.seqfile.compression.type", val.toString());
257  }
258
259  /**
260   * Create a new Writer with the given options.
261   * @param conf the configuration to use
262   * @param opts the options to create the file with
263   * @return a new Writer
264   * @throws IOException
265   */
266  public static Writer createWriter(Configuration conf, Writer.Option... opts
267                                    ) throws IOException {
268    Writer.CompressionOption compressionOption = 
269      Options.getOption(Writer.CompressionOption.class, opts);
270    CompressionType kind;
271    if (compressionOption != null) {
272      kind = compressionOption.getValue();
273    } else {
274      kind = getDefaultCompressionType(conf);
275      opts = Options.prependOptions(opts, Writer.compression(kind));
276    }
277    switch (kind) {
278      default:
279      case NONE:
280        return new Writer(conf, opts);
281      case RECORD:
282        return new RecordCompressWriter(conf, opts);
283      case BLOCK:
284        return new BlockCompressWriter(conf, opts);
285    }
286  }
287
288  /**
289   * Construct the preferred type of SequenceFile Writer.
290   * @param fs The configured filesystem. 
291   * @param conf The configuration.
292   * @param name The name of the file. 
293   * @param keyClass The 'key' type.
294   * @param valClass The 'value' type.
295   * @return Returns the handle to the constructed SequenceFile Writer.
296   * @throws IOException
297   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
298   *     instead.
299   */
300  @Deprecated
301  public static Writer 
302    createWriter(FileSystem fs, Configuration conf, Path name, 
303                 Class keyClass, Class valClass) throws IOException {
304    return createWriter(conf, Writer.filesystem(fs),
305                        Writer.file(name), Writer.keyClass(keyClass),
306                        Writer.valueClass(valClass));
307  }
308  
309  /**
310   * Construct the preferred type of SequenceFile Writer.
311   * @param fs The configured filesystem. 
312   * @param conf The configuration.
313   * @param name The name of the file. 
314   * @param keyClass The 'key' type.
315   * @param valClass The 'value' type.
316   * @param compressionType The compression type.
317   * @return Returns the handle to the constructed SequenceFile Writer.
318   * @throws IOException
319   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
320   *     instead.
321   */
322  @Deprecated
323  public static Writer 
324    createWriter(FileSystem fs, Configuration conf, Path name, 
325                 Class keyClass, Class valClass, 
326                 CompressionType compressionType) throws IOException {
327    return createWriter(conf, Writer.filesystem(fs),
328                        Writer.file(name), Writer.keyClass(keyClass),
329                        Writer.valueClass(valClass), 
330                        Writer.compression(compressionType));
331  }
332  
333  /**
334   * Construct the preferred type of SequenceFile Writer.
335   * @param fs The configured filesystem. 
336   * @param conf The configuration.
337   * @param name The name of the file. 
338   * @param keyClass The 'key' type.
339   * @param valClass The 'value' type.
340   * @param compressionType The compression type.
341   * @param progress The Progressable object to track progress.
342   * @return Returns the handle to the constructed SequenceFile Writer.
343   * @throws IOException
344   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
345   *     instead.
346   */
347  @Deprecated
348  public static Writer
349    createWriter(FileSystem fs, Configuration conf, Path name, 
350                 Class keyClass, Class valClass, CompressionType compressionType,
351                 Progressable progress) throws IOException {
352    return createWriter(conf, Writer.file(name),
353                        Writer.filesystem(fs),
354                        Writer.keyClass(keyClass),
355                        Writer.valueClass(valClass), 
356                        Writer.compression(compressionType),
357                        Writer.progressable(progress));
358  }
359
360  /**
361   * Construct the preferred type of SequenceFile Writer.
362   * @param fs The configured filesystem. 
363   * @param conf The configuration.
364   * @param name The name of the file. 
365   * @param keyClass The 'key' type.
366   * @param valClass The 'value' type.
367   * @param compressionType The compression type.
368   * @param codec The compression codec.
369   * @return Returns the handle to the constructed SequenceFile Writer.
370   * @throws IOException
371   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
372   *     instead.
373   */
374  @Deprecated
375  public static Writer 
376    createWriter(FileSystem fs, Configuration conf, Path name, 
377                 Class keyClass, Class valClass, CompressionType compressionType, 
378                 CompressionCodec codec) throws IOException {
379    return createWriter(conf, Writer.file(name),
380                        Writer.filesystem(fs),
381                        Writer.keyClass(keyClass),
382                        Writer.valueClass(valClass), 
383                        Writer.compression(compressionType, codec));
384  }
385  
386  /**
387   * Construct the preferred type of SequenceFile Writer.
388   * @param fs The configured filesystem. 
389   * @param conf The configuration.
390   * @param name The name of the file. 
391   * @param keyClass The 'key' type.
392   * @param valClass The 'value' type.
393   * @param compressionType The compression type.
394   * @param codec The compression codec.
395   * @param progress The Progressable object to track progress.
396   * @param metadata The metadata of the file.
397   * @return Returns the handle to the constructed SequenceFile Writer.
398   * @throws IOException
399   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
400   *     instead.
401   */
402  @Deprecated
403  public static Writer
404    createWriter(FileSystem fs, Configuration conf, Path name, 
405                 Class keyClass, Class valClass, 
406                 CompressionType compressionType, CompressionCodec codec,
407                 Progressable progress, Metadata metadata) throws IOException {
408    return createWriter(conf, Writer.file(name),
409                        Writer.filesystem(fs),
410                        Writer.keyClass(keyClass),
411                        Writer.valueClass(valClass),
412                        Writer.compression(compressionType, codec),
413                        Writer.progressable(progress),
414                        Writer.metadata(metadata));
415  }
416
417  /**
418   * Construct the preferred type of SequenceFile Writer.
419   * @param fs The configured filesystem.
420   * @param conf The configuration.
421   * @param name The name of the file.
422   * @param keyClass The 'key' type.
423   * @param valClass The 'value' type.
424   * @param bufferSize buffer size for the underlaying outputstream.
425   * @param replication replication factor for the file.
426   * @param blockSize block size for the file.
427   * @param compressionType The compression type.
428   * @param codec The compression codec.
429   * @param progress The Progressable object to track progress.
430   * @param metadata The metadata of the file.
431   * @return Returns the handle to the constructed SequenceFile Writer.
432   * @throws IOException
433   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
434   *     instead.
435   */
436  @Deprecated
437  public static Writer
438    createWriter(FileSystem fs, Configuration conf, Path name,
439                 Class keyClass, Class valClass, int bufferSize,
440                 short replication, long blockSize,
441                 CompressionType compressionType, CompressionCodec codec,
442                 Progressable progress, Metadata metadata) throws IOException {
443    return createWriter(conf, Writer.file(name),
444                        Writer.filesystem(fs),
445                        Writer.keyClass(keyClass),
446                        Writer.valueClass(valClass), 
447                        Writer.bufferSize(bufferSize), 
448                        Writer.replication(replication),
449                        Writer.blockSize(blockSize),
450                        Writer.compression(compressionType, codec),
451                        Writer.progressable(progress),
452                        Writer.metadata(metadata));
453  }
454
455  /**
456   * Construct the preferred type of SequenceFile Writer.
457   * @param fs The configured filesystem.
458   * @param conf The configuration.
459   * @param name The name of the file.
460   * @param keyClass The 'key' type.
461   * @param valClass The 'value' type.
462   * @param bufferSize buffer size for the underlaying outputstream.
463   * @param replication replication factor for the file.
464   * @param blockSize block size for the file.
465   * @param createParent create parent directory if non-existent
466   * @param compressionType The compression type.
467   * @param codec The compression codec.
468   * @param metadata The metadata of the file.
469   * @return Returns the handle to the constructed SequenceFile Writer.
470   * @throws IOException
471   */
472  @Deprecated
473  public static Writer
474  createWriter(FileSystem fs, Configuration conf, Path name,
475               Class keyClass, Class valClass, int bufferSize,
476               short replication, long blockSize, boolean createParent,
477               CompressionType compressionType, CompressionCodec codec,
478               Metadata metadata) throws IOException {
479    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
480        conf, name, keyClass, valClass, compressionType, codec,
481        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
482        CreateOpts.bufferSize(bufferSize),
483        createParent ? CreateOpts.createParent()
484                     : CreateOpts.donotCreateParent(),
485        CreateOpts.repFac(replication),
486        CreateOpts.blockSize(blockSize)
487      );
488  }
489
490  /**
491   * Construct the preferred type of SequenceFile Writer.
492   * @param fc The context for the specified file.
493   * @param conf The configuration.
494   * @param name The name of the file.
495   * @param keyClass The 'key' type.
496   * @param valClass The 'value' type.
497   * @param compressionType The compression type.
498   * @param codec The compression codec.
499   * @param metadata The metadata of the file.
500   * @param createFlag gives the semantics of create: overwrite, append etc.
501   * @param opts file creation options; see {@link CreateOpts}.
502   * @return Returns the handle to the constructed SequenceFile Writer.
503   * @throws IOException
504   */
505  public static Writer
506  createWriter(FileContext fc, Configuration conf, Path name,
507               Class keyClass, Class valClass,
508               CompressionType compressionType, CompressionCodec codec,
509               Metadata metadata,
510               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
511               throws IOException {
512    return createWriter(conf, fc.create(name, createFlag, opts),
513          keyClass, valClass, compressionType, codec, metadata).ownStream();
514  }
515
516  /**
517   * Construct the preferred type of SequenceFile Writer.
518   * @param fs The configured filesystem. 
519   * @param conf The configuration.
520   * @param name The name of the file. 
521   * @param keyClass The 'key' type.
522   * @param valClass The 'value' type.
523   * @param compressionType The compression type.
524   * @param codec The compression codec.
525   * @param progress The Progressable object to track progress.
526   * @return Returns the handle to the constructed SequenceFile Writer.
527   * @throws IOException
528   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
529   *     instead.
530   */
531  @Deprecated
532  public static Writer
533    createWriter(FileSystem fs, Configuration conf, Path name, 
534                 Class keyClass, Class valClass, 
535                 CompressionType compressionType, CompressionCodec codec,
536                 Progressable progress) throws IOException {
537    return createWriter(conf, Writer.file(name),
538                        Writer.filesystem(fs),
539                        Writer.keyClass(keyClass),
540                        Writer.valueClass(valClass),
541                        Writer.compression(compressionType, codec),
542                        Writer.progressable(progress));
543  }
544
545  /**
546   * Construct the preferred type of 'raw' SequenceFile Writer.
547   * @param conf The configuration.
548   * @param out The stream on top which the writer is to be constructed.
549   * @param keyClass The 'key' type.
550   * @param valClass The 'value' type.
551   * @param compressionType The compression type.
552   * @param codec The compression codec.
553   * @param metadata The metadata of the file.
554   * @return Returns the handle to the constructed SequenceFile Writer.
555   * @throws IOException
556   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
557   *     instead.
558   */
559  @Deprecated
560  public static Writer
561    createWriter(Configuration conf, FSDataOutputStream out, 
562                 Class keyClass, Class valClass,
563                 CompressionType compressionType,
564                 CompressionCodec codec, Metadata metadata) throws IOException {
565    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
566                        Writer.valueClass(valClass), 
567                        Writer.compression(compressionType, codec),
568                        Writer.metadata(metadata));
569  }
570  
571  /**
572   * Construct the preferred type of 'raw' SequenceFile Writer.
573   * @param conf The configuration.
574   * @param out The stream on top which the writer is to be constructed.
575   * @param keyClass The 'key' type.
576   * @param valClass The 'value' type.
577   * @param compressionType The compression type.
578   * @param codec The compression codec.
579   * @return Returns the handle to the constructed SequenceFile Writer.
580   * @throws IOException
581   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
582   *     instead.
583   */
584  @Deprecated
585  public static Writer
586    createWriter(Configuration conf, FSDataOutputStream out, 
587                 Class keyClass, Class valClass, CompressionType compressionType,
588                 CompressionCodec codec) throws IOException {
589    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
590                        Writer.valueClass(valClass),
591                        Writer.compression(compressionType, codec));
592  }
593  
594
595  /** The interface to 'raw' values of SequenceFiles. */
596  public static interface ValueBytes {
597
598    /** Writes the uncompressed bytes to the outStream.
599     * @param outStream : Stream to write uncompressed bytes into.
600     * @throws IOException
601     */
602    public void writeUncompressedBytes(DataOutputStream outStream)
603      throws IOException;
604
605    /** Write compressed bytes to outStream. 
606     * Note: that it will NOT compress the bytes if they are not compressed.
607     * @param outStream : Stream to write compressed bytes into.
608     */
609    public void writeCompressedBytes(DataOutputStream outStream) 
610      throws IllegalArgumentException, IOException;
611
612    /**
613     * Size of stored data.
614     */
615    public int getSize();
616  }
617  
618  private static class UncompressedBytes implements ValueBytes {
619    private int dataSize;
620    private byte[] data;
621    
622    private UncompressedBytes() {
623      data = null;
624      dataSize = 0;
625    }
626    
627    private void reset(DataInputStream in, int length) throws IOException {
628      if (data == null) {
629        data = new byte[length];
630      } else if (length > data.length) {
631        data = new byte[Math.max(length, data.length * 2)];
632      }
633      dataSize = -1;
634      in.readFully(data, 0, length);
635      dataSize = length;
636    }
637    
638    @Override
639    public int getSize() {
640      return dataSize;
641    }
642    
643    @Override
644    public void writeUncompressedBytes(DataOutputStream outStream)
645      throws IOException {
646      outStream.write(data, 0, dataSize);
647    }
648
649    @Override
650    public void writeCompressedBytes(DataOutputStream outStream) 
651      throws IllegalArgumentException, IOException {
652      throw 
653        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
654    }
655
656  } // UncompressedBytes
657  
658  private static class CompressedBytes implements ValueBytes {
659    private int dataSize;
660    private byte[] data;
661    DataInputBuffer rawData = null;
662    CompressionCodec codec = null;
663    CompressionInputStream decompressedStream = null;
664
665    private CompressedBytes(CompressionCodec codec) {
666      data = null;
667      dataSize = 0;
668      this.codec = codec;
669    }
670
671    private void reset(DataInputStream in, int length) throws IOException {
672      if (data == null) {
673        data = new byte[length];
674      } else if (length > data.length) {
675        data = new byte[Math.max(length, data.length * 2)];
676      } 
677      dataSize = -1;
678      in.readFully(data, 0, length);
679      dataSize = length;
680    }
681    
682    @Override
683    public int getSize() {
684      return dataSize;
685    }
686    
687    @Override
688    public void writeUncompressedBytes(DataOutputStream outStream)
689      throws IOException {
690      if (decompressedStream == null) {
691        rawData = new DataInputBuffer();
692        decompressedStream = codec.createInputStream(rawData);
693      } else {
694        decompressedStream.resetState();
695      }
696      rawData.reset(data, 0, dataSize);
697
698      byte[] buffer = new byte[8192];
699      int bytesRead = 0;
700      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
701        outStream.write(buffer, 0, bytesRead);
702      }
703    }
704
705    @Override
706    public void writeCompressedBytes(DataOutputStream outStream) 
707      throws IllegalArgumentException, IOException {
708      outStream.write(data, 0, dataSize);
709    }
710
711  } // CompressedBytes
712  
713  /**
714   * The class encapsulating with the metadata of a file.
715   * The metadata of a file is a list of attribute name/value
716   * pairs of Text type.
717   *
718   */
719  public static class Metadata implements Writable {
720
721    private TreeMap<Text, Text> theMetadata;
722    
723    public Metadata() {
724      this(new TreeMap<Text, Text>());
725    }
726    
727    public Metadata(TreeMap<Text, Text> arg) {
728      if (arg == null) {
729        this.theMetadata = new TreeMap<Text, Text>();
730      } else {
731        this.theMetadata = arg;
732      }
733    }
734    
735    public Text get(Text name) {
736      return this.theMetadata.get(name);
737    }
738    
739    public void set(Text name, Text value) {
740      this.theMetadata.put(name, value);
741    }
742    
743    public TreeMap<Text, Text> getMetadata() {
744      return new TreeMap<Text, Text>(this.theMetadata);
745    }
746    
747    @Override
748    public void write(DataOutput out) throws IOException {
749      out.writeInt(this.theMetadata.size());
750      Iterator<Map.Entry<Text, Text>> iter =
751        this.theMetadata.entrySet().iterator();
752      while (iter.hasNext()) {
753        Map.Entry<Text, Text> en = iter.next();
754        en.getKey().write(out);
755        en.getValue().write(out);
756      }
757    }
758
759    @Override
760    public void readFields(DataInput in) throws IOException {
761      int sz = in.readInt();
762      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
763      this.theMetadata = new TreeMap<Text, Text>();
764      for (int i = 0; i < sz; i++) {
765        Text key = new Text();
766        Text val = new Text();
767        key.readFields(in);
768        val.readFields(in);
769        this.theMetadata.put(key, val);
770      }    
771    }
772
773    @Override
774    public boolean equals(Object other) {
775      if (other == null) {
776        return false;
777      }
778      if (other.getClass() != this.getClass()) {
779        return false;
780      } else {
781        return equals((Metadata)other);
782      }
783    }
784    
785    public boolean equals(Metadata other) {
786      if (other == null) return false;
787      if (this.theMetadata.size() != other.theMetadata.size()) {
788        return false;
789      }
790      Iterator<Map.Entry<Text, Text>> iter1 =
791        this.theMetadata.entrySet().iterator();
792      Iterator<Map.Entry<Text, Text>> iter2 =
793        other.theMetadata.entrySet().iterator();
794      while (iter1.hasNext() && iter2.hasNext()) {
795        Map.Entry<Text, Text> en1 = iter1.next();
796        Map.Entry<Text, Text> en2 = iter2.next();
797        if (!en1.getKey().equals(en2.getKey())) {
798          return false;
799        }
800        if (!en1.getValue().equals(en2.getValue())) {
801          return false;
802        }
803      }
804      if (iter1.hasNext() || iter2.hasNext()) {
805        return false;
806      }
807      return true;
808    }
809
810    @Override
811    public int hashCode() {
812      assert false : "hashCode not designed";
813      return 42; // any arbitrary constant will do 
814    }
815    
816    @Override
817    public String toString() {
818      StringBuilder sb = new StringBuilder();
819      sb.append("size: ").append(this.theMetadata.size()).append("\n");
820      Iterator<Map.Entry<Text, Text>> iter =
821        this.theMetadata.entrySet().iterator();
822      while (iter.hasNext()) {
823        Map.Entry<Text, Text> en = iter.next();
824        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
825        sb.append("\n");
826      }
827      return sb.toString();
828    }
829  }
830  
831  /** Write key/value pairs to a sequence-format file. */
832  public static class Writer implements java.io.Closeable, Syncable {
833    private Configuration conf;
834    FSDataOutputStream out;
835    boolean ownOutputStream = true;
836    DataOutputBuffer buffer = new DataOutputBuffer();
837
838    Class keyClass;
839    Class valClass;
840
841    private final CompressionType compress;
842    CompressionCodec codec = null;
843    CompressionOutputStream deflateFilter = null;
844    DataOutputStream deflateOut = null;
845    Metadata metadata = null;
846    Compressor compressor = null;
847
848    private boolean appendMode = false;
849
850    protected Serializer keySerializer;
851    protected Serializer uncompressedValSerializer;
852    protected Serializer compressedValSerializer;
853    
854    // Insert a globally unique 16-byte value every few entries, so that one
855    // can seek into the middle of a file and then synchronize with record
856    // starts and ends by scanning for this value.
857    long lastSyncPos;                     // position of last sync
858    byte[] sync;                          // 16 random bytes
859    {
860      try {                                       
861        MessageDigest digester = MessageDigest.getInstance("MD5");
862        long time = Time.now();
863        digester.update((new UID()+"@"+time).getBytes(StandardCharsets.UTF_8));
864        sync = digester.digest();
865      } catch (Exception e) {
866        throw new RuntimeException(e);
867      }
868    }
869
870    public static interface Option {}
871    
872    static class FileOption extends Options.PathOption 
873                                    implements Option {
874      FileOption(Path path) {
875        super(path);
876      }
877    }
878
879    /**
880     * @deprecated only used for backwards-compatibility in the createWriter methods
881     * that take FileSystem.
882     */
883    @Deprecated
884    private static class FileSystemOption implements Option {
885      private final FileSystem value;
886      protected FileSystemOption(FileSystem value) {
887        this.value = value;
888      }
889      public FileSystem getValue() {
890        return value;
891      }
892    }
893
894    static class StreamOption extends Options.FSDataOutputStreamOption 
895                              implements Option {
896      StreamOption(FSDataOutputStream stream) {
897        super(stream);
898      }
899    }
900
901    static class BufferSizeOption extends Options.IntegerOption
902                                  implements Option {
903      BufferSizeOption(int value) {
904        super(value);
905      }
906    }
907    
908    static class BlockSizeOption extends Options.LongOption implements Option {
909      BlockSizeOption(long value) {
910        super(value);
911      }
912    }
913
914    static class ReplicationOption extends Options.IntegerOption
915                                   implements Option {
916      ReplicationOption(int value) {
917        super(value);
918      }
919    }
920
921    static class AppendIfExistsOption extends Options.BooleanOption implements
922        Option {
923      AppendIfExistsOption(boolean value) {
924        super(value);
925      }
926    }
927
928    static class KeyClassOption extends Options.ClassOption implements Option {
929      KeyClassOption(Class<?> value) {
930        super(value);
931      }
932    }
933
934    static class ValueClassOption extends Options.ClassOption
935                                          implements Option {
936      ValueClassOption(Class<?> value) {
937        super(value);
938      }
939    }
940
941    static class MetadataOption implements Option {
942      private final Metadata value;
943      MetadataOption(Metadata value) {
944        this.value = value;
945      }
946      Metadata getValue() {
947        return value;
948      }
949    }
950
951    static class ProgressableOption extends Options.ProgressableOption
952                                    implements Option {
953      ProgressableOption(Progressable value) {
954        super(value);
955      }
956    }
957
958    private static class CompressionOption implements Option {
959      private final CompressionType value;
960      private final CompressionCodec codec;
961      CompressionOption(CompressionType value) {
962        this(value, null);
963      }
964      CompressionOption(CompressionType value, CompressionCodec codec) {
965        this.value = value;
966        this.codec = (CompressionType.NONE != value && null == codec)
967          ? new DefaultCodec()
968          : codec;
969      }
970      CompressionType getValue() {
971        return value;
972      }
973      CompressionCodec getCodec() {
974        return codec;
975      }
976    }
977
978    public static Option file(Path value) {
979      return new FileOption(value);
980    }
981
982    /**
983     * @deprecated only used for backwards-compatibility in the createWriter methods
984     * that take FileSystem.
985     */
986    @Deprecated
987    private static Option filesystem(FileSystem fs) {
988      return new SequenceFile.Writer.FileSystemOption(fs);
989    }
990    
991    public static Option bufferSize(int value) {
992      return new BufferSizeOption(value);
993    }
994    
995    public static Option stream(FSDataOutputStream value) {
996      return new StreamOption(value);
997    }
998    
999    public static Option replication(short value) {
1000      return new ReplicationOption(value);
1001    }
1002    
1003    public static Option appendIfExists(boolean value) {
1004      return new AppendIfExistsOption(value);
1005    }
1006
1007    public static Option blockSize(long value) {
1008      return new BlockSizeOption(value);
1009    }
1010    
1011    public static Option progressable(Progressable value) {
1012      return new ProgressableOption(value);
1013    }
1014
1015    public static Option keyClass(Class<?> value) {
1016      return new KeyClassOption(value);
1017    }
1018    
1019    public static Option valueClass(Class<?> value) {
1020      return new ValueClassOption(value);
1021    }
1022    
1023    public static Option metadata(Metadata value) {
1024      return new MetadataOption(value);
1025    }
1026
1027    public static Option compression(CompressionType value) {
1028      return new CompressionOption(value);
1029    }
1030
1031    public static Option compression(CompressionType value,
1032        CompressionCodec codec) {
1033      return new CompressionOption(value, codec);
1034    }
1035    
1036    /**
1037     * Construct a uncompressed writer from a set of options.
1038     * @param conf the configuration to use
1039     * @param options the options used when creating the writer
1040     * @throws IOException if it fails
1041     */
1042    Writer(Configuration conf, 
1043           Option... opts) throws IOException {
1044      BlockSizeOption blockSizeOption = 
1045        Options.getOption(BlockSizeOption.class, opts);
1046      BufferSizeOption bufferSizeOption = 
1047        Options.getOption(BufferSizeOption.class, opts);
1048      ReplicationOption replicationOption = 
1049        Options.getOption(ReplicationOption.class, opts);
1050      ProgressableOption progressOption = 
1051        Options.getOption(ProgressableOption.class, opts);
1052      FileOption fileOption = Options.getOption(FileOption.class, opts);
1053      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1054          AppendIfExistsOption.class, opts);
1055      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1056      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1057      KeyClassOption keyClassOption = 
1058        Options.getOption(KeyClassOption.class, opts);
1059      ValueClassOption valueClassOption = 
1060        Options.getOption(ValueClassOption.class, opts);
1061      MetadataOption metadataOption = 
1062        Options.getOption(MetadataOption.class, opts);
1063      CompressionOption compressionTypeOption =
1064        Options.getOption(CompressionOption.class, opts);
1065      // check consistency of options
1066      if ((fileOption == null) == (streamOption == null)) {
1067        throw new IllegalArgumentException("file or stream must be specified");
1068      }
1069      if (fileOption == null && (blockSizeOption != null ||
1070                                 bufferSizeOption != null ||
1071                                 replicationOption != null ||
1072                                 progressOption != null)) {
1073        throw new IllegalArgumentException("file modifier options not " +
1074                                           "compatible with stream");
1075      }
1076
1077      FSDataOutputStream out;
1078      boolean ownStream = fileOption != null;
1079      if (ownStream) {
1080        Path p = fileOption.getValue();
1081        FileSystem fs;
1082        if (fsOption != null) {
1083          fs = fsOption.getValue();
1084        } else {
1085          fs = p.getFileSystem(conf);
1086        }
1087        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1088          bufferSizeOption.getValue();
1089        short replication = replicationOption == null ? 
1090          fs.getDefaultReplication(p) :
1091          (short) replicationOption.getValue();
1092        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1093          blockSizeOption.getValue();
1094        Progressable progress = progressOption == null ? null :
1095          progressOption.getValue();
1096
1097        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1098            && fs.exists(p)) {
1099
1100          // Read the file and verify header details
1101          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1102              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1103          try {
1104
1105            if (keyClassOption.getValue() != reader.getKeyClass()
1106                || valueClassOption.getValue() != reader.getValueClass()) {
1107              throw new IllegalArgumentException(
1108                  "Key/value class provided does not match the file");
1109            }
1110
1111            if (reader.getVersion() != VERSION[3]) {
1112              throw new VersionMismatchException(VERSION[3],
1113                  reader.getVersion());
1114            }
1115
1116            if (metadataOption != null) {
1117              LOG.info("MetaData Option is ignored during append");
1118            }
1119            metadataOption = (MetadataOption) SequenceFile.Writer
1120                .metadata(reader.getMetadata());
1121
1122            CompressionOption readerCompressionOption = new CompressionOption(
1123                reader.getCompressionType(), reader.getCompressionCodec());
1124
1125            // Codec comparison will be ignored if the compression is NONE
1126            if (readerCompressionOption.value != compressionTypeOption.value
1127                || (readerCompressionOption.value != CompressionType.NONE
1128                    && readerCompressionOption.codec
1129                        .getClass() != compressionTypeOption.codec
1130                            .getClass())) {
1131              throw new IllegalArgumentException(
1132                  "Compression option provided does not match the file");
1133            }
1134
1135            sync = reader.getSync();
1136
1137          } finally {
1138            reader.close();
1139          }
1140
1141          out = fs.append(p, bufferSize, progress);
1142          this.appendMode = true;
1143        } else {
1144          out = fs
1145              .create(p, true, bufferSize, replication, blockSize, progress);
1146        }
1147      } else {
1148        out = streamOption.getValue();
1149      }
1150      Class<?> keyClass = keyClassOption == null ?
1151          Object.class : keyClassOption.getValue();
1152      Class<?> valueClass = valueClassOption == null ?
1153          Object.class : valueClassOption.getValue();
1154      Metadata metadata = metadataOption == null ?
1155          new Metadata() : metadataOption.getValue();
1156      this.compress = compressionTypeOption.getValue();
1157      final CompressionCodec codec = compressionTypeOption.getCodec();
1158      if (codec != null &&
1159          (codec instanceof GzipCodec) &&
1160          !NativeCodeLoader.isNativeCodeLoaded() &&
1161          !ZlibFactory.isNativeZlibLoaded(conf)) {
1162        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1163                                           "GzipCodec without native-hadoop " +
1164                                           "code!");
1165      }
1166      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1167    }
1168
1169    /** Create the named file.
1170     * @deprecated Use 
1171     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1172     *   instead.
1173     */
1174    @Deprecated
1175    public Writer(FileSystem fs, Configuration conf, Path name, 
1176                  Class keyClass, Class valClass) throws IOException {
1177      this.compress = CompressionType.NONE;
1178      init(conf, fs.create(name), true, keyClass, valClass, null, 
1179           new Metadata());
1180    }
1181    
1182    /** Create the named file with write-progress reporter.
1183     * @deprecated Use 
1184     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1185     *   instead.
1186     */
1187    @Deprecated
1188    public Writer(FileSystem fs, Configuration conf, Path name, 
1189                  Class keyClass, Class valClass,
1190                  Progressable progress, Metadata metadata) throws IOException {
1191      this.compress = CompressionType.NONE;
1192      init(conf, fs.create(name, progress), true, keyClass, valClass,
1193           null, metadata);
1194    }
1195    
1196    /** Create the named file with write-progress reporter. 
1197     * @deprecated Use 
1198     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1199     *   instead.
1200     */
1201    @Deprecated
1202    public Writer(FileSystem fs, Configuration conf, Path name,
1203                  Class keyClass, Class valClass,
1204                  int bufferSize, short replication, long blockSize,
1205                  Progressable progress, Metadata metadata) throws IOException {
1206      this.compress = CompressionType.NONE;
1207      init(conf,
1208           fs.create(name, true, bufferSize, replication, blockSize, progress),
1209           true, keyClass, valClass, null, metadata);
1210    }
1211
1212    boolean isCompressed() { return compress != CompressionType.NONE; }
1213    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1214    
1215    Writer ownStream() { this.ownOutputStream = true; return this;  }
1216
1217    /** Write and flush the file header. */
1218    private void writeFileHeader() 
1219      throws IOException {
1220      out.write(VERSION);
1221      Text.writeString(out, keyClass.getName());
1222      Text.writeString(out, valClass.getName());
1223      
1224      out.writeBoolean(this.isCompressed());
1225      out.writeBoolean(this.isBlockCompressed());
1226      
1227      if (this.isCompressed()) {
1228        Text.writeString(out, (codec.getClass()).getName());
1229      }
1230      this.metadata.write(out);
1231      out.write(sync);                       // write the sync bytes
1232      out.flush();                           // flush header
1233    }
1234
1235    /** Initialize. */
1236    @SuppressWarnings("unchecked")
1237    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1238              Class keyClass, Class valClass,
1239              CompressionCodec codec, Metadata metadata) 
1240      throws IOException {
1241      this.conf = conf;
1242      this.out = out;
1243      this.ownOutputStream = ownStream;
1244      this.keyClass = keyClass;
1245      this.valClass = valClass;
1246      this.codec = codec;
1247      this.metadata = metadata;
1248      SerializationFactory serializationFactory = new SerializationFactory(conf);
1249      this.keySerializer = serializationFactory.getSerializer(keyClass);
1250      if (this.keySerializer == null) {
1251        throw new IOException(
1252            "Could not find a serializer for the Key class: '"
1253                + keyClass.getCanonicalName() + "'. "
1254                + "Please ensure that the configuration '" +
1255                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1256                + "properly configured, if you're using"
1257                + "custom serialization.");
1258      }
1259      this.keySerializer.open(buffer);
1260      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1261      if (this.uncompressedValSerializer == null) {
1262        throw new IOException(
1263            "Could not find a serializer for the Value class: '"
1264                + valClass.getCanonicalName() + "'. "
1265                + "Please ensure that the configuration '" +
1266                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1267                + "properly configured, if you're using"
1268                + "custom serialization.");
1269      }
1270      this.uncompressedValSerializer.open(buffer);
1271      if (this.codec != null) {
1272        ReflectionUtils.setConf(this.codec, this.conf);
1273        this.compressor = CodecPool.getCompressor(this.codec);
1274        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1275        this.deflateOut = 
1276          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1277        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1278        if (this.compressedValSerializer == null) {
1279          throw new IOException(
1280              "Could not find a serializer for the Value class: '"
1281                  + valClass.getCanonicalName() + "'. "
1282                  + "Please ensure that the configuration '" +
1283                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1284                  + "properly configured, if you're using"
1285                  + "custom serialization.");
1286        }
1287        this.compressedValSerializer.open(deflateOut);
1288      }
1289
1290      if (appendMode) {
1291        sync();
1292      } else {
1293        writeFileHeader();
1294      }
1295    }
1296    
1297    /** Returns the class of keys in this file. */
1298    public Class getKeyClass() { return keyClass; }
1299
1300    /** Returns the class of values in this file. */
1301    public Class getValueClass() { return valClass; }
1302
1303    /** Returns the compression codec of data in this file. */
1304    public CompressionCodec getCompressionCodec() { return codec; }
1305    
1306    /** create a sync point */
1307    public void sync() throws IOException {
1308      if (sync != null && lastSyncPos != out.getPos()) {
1309        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1310        out.write(sync);                          // write sync
1311        lastSyncPos = out.getPos();               // update lastSyncPos
1312      }
1313    }
1314
1315    /**
1316     * flush all currently written data to the file system
1317     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1318     */
1319    @Deprecated
1320    public void syncFs() throws IOException {
1321      if (out != null) {
1322        out.sync();                               // flush contents to file system
1323      }
1324    }
1325
1326    @Override
1327    public void hsync() throws IOException {
1328      if (out != null) {
1329        out.hsync();
1330      }
1331    }
1332
1333    @Override
1334    public void hflush() throws IOException {
1335      if (out != null) {
1336        out.hflush();
1337      }
1338    }
1339    
1340    /** Returns the configuration of this file. */
1341    Configuration getConf() { return conf; }
1342    
1343    /** Close the file. */
1344    @Override
1345    public synchronized void close() throws IOException {
1346      keySerializer.close();
1347      uncompressedValSerializer.close();
1348      if (compressedValSerializer != null) {
1349        compressedValSerializer.close();
1350      }
1351
1352      CodecPool.returnCompressor(compressor);
1353      compressor = null;
1354      
1355      if (out != null) {
1356        
1357        // Close the underlying stream iff we own it...
1358        if (ownOutputStream) {
1359          out.close();
1360        } else {
1361          out.flush();
1362        }
1363        out = null;
1364      }
1365    }
1366
1367    synchronized void checkAndWriteSync() throws IOException {
1368      if (sync != null &&
1369          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1370        sync();
1371      }
1372    }
1373
1374    /** Append a key/value pair. */
1375    public void append(Writable key, Writable val)
1376      throws IOException {
1377      append((Object) key, (Object) val);
1378    }
1379
1380    /** Append a key/value pair. */
1381    @SuppressWarnings("unchecked")
1382    public synchronized void append(Object key, Object val)
1383      throws IOException {
1384      if (key.getClass() != keyClass)
1385        throw new IOException("wrong key class: "+key.getClass().getName()
1386                              +" is not "+keyClass);
1387      if (val.getClass() != valClass)
1388        throw new IOException("wrong value class: "+val.getClass().getName()
1389                              +" is not "+valClass);
1390
1391      buffer.reset();
1392
1393      // Append the 'key'
1394      keySerializer.serialize(key);
1395      int keyLength = buffer.getLength();
1396      if (keyLength < 0)
1397        throw new IOException("negative length keys not allowed: " + key);
1398
1399      // Append the 'value'
1400      if (compress == CompressionType.RECORD) {
1401        deflateFilter.resetState();
1402        compressedValSerializer.serialize(val);
1403        deflateOut.flush();
1404        deflateFilter.finish();
1405      } else {
1406        uncompressedValSerializer.serialize(val);
1407      }
1408
1409      // Write the record out
1410      checkAndWriteSync();                                // sync
1411      out.writeInt(buffer.getLength());                   // total record length
1412      out.writeInt(keyLength);                            // key portion length
1413      out.write(buffer.getData(), 0, buffer.getLength()); // data
1414    }
1415
1416    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1417        int keyLength, ValueBytes val) throws IOException {
1418      if (keyLength < 0)
1419        throw new IOException("negative length keys not allowed: " + keyLength);
1420
1421      int valLength = val.getSize();
1422
1423      checkAndWriteSync();
1424      
1425      out.writeInt(keyLength+valLength);          // total record length
1426      out.writeInt(keyLength);                    // key portion length
1427      out.write(keyData, keyOffset, keyLength);   // key
1428      val.writeUncompressedBytes(out);            // value
1429    }
1430
1431    /** Returns the current length of the output file.
1432     *
1433     * <p>This always returns a synchronized position.  In other words,
1434     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1435     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1436     * the key may be earlier in the file than key last written when this
1437     * method was called (e.g., with block-compression, it may be the first key
1438     * in the block that was being written when this method was called).
1439     */
1440    public synchronized long getLength() throws IOException {
1441      return out.getPos();
1442    }
1443
1444  } // class Writer
1445
1446  /** Write key/compressed-value pairs to a sequence-format file. */
1447  static class RecordCompressWriter extends Writer {
1448    
1449    RecordCompressWriter(Configuration conf, 
1450                         Option... options) throws IOException {
1451      super(conf, options);
1452    }
1453
1454    /** Append a key/value pair. */
1455    @Override
1456    @SuppressWarnings("unchecked")
1457    public synchronized void append(Object key, Object val)
1458      throws IOException {
1459      if (key.getClass() != keyClass)
1460        throw new IOException("wrong key class: "+key.getClass().getName()
1461                              +" is not "+keyClass);
1462      if (val.getClass() != valClass)
1463        throw new IOException("wrong value class: "+val.getClass().getName()
1464                              +" is not "+valClass);
1465
1466      buffer.reset();
1467
1468      // Append the 'key'
1469      keySerializer.serialize(key);
1470      int keyLength = buffer.getLength();
1471      if (keyLength < 0)
1472        throw new IOException("negative length keys not allowed: " + key);
1473
1474      // Compress 'value' and append it
1475      deflateFilter.resetState();
1476      compressedValSerializer.serialize(val);
1477      deflateOut.flush();
1478      deflateFilter.finish();
1479
1480      // Write the record out
1481      checkAndWriteSync();                                // sync
1482      out.writeInt(buffer.getLength());                   // total record length
1483      out.writeInt(keyLength);                            // key portion length
1484      out.write(buffer.getData(), 0, buffer.getLength()); // data
1485    }
1486
1487    /** Append a key/value pair. */
1488    @Override
1489    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1490        int keyLength, ValueBytes val) throws IOException {
1491
1492      if (keyLength < 0)
1493        throw new IOException("negative length keys not allowed: " + keyLength);
1494
1495      int valLength = val.getSize();
1496      
1497      checkAndWriteSync();                        // sync
1498      out.writeInt(keyLength+valLength);          // total record length
1499      out.writeInt(keyLength);                    // key portion length
1500      out.write(keyData, keyOffset, keyLength);   // 'key' data
1501      val.writeCompressedBytes(out);              // 'value' data
1502    }
1503    
1504  } // RecordCompressionWriter
1505
1506  /** Write compressed key/value blocks to a sequence-format file. */
1507  static class BlockCompressWriter extends Writer {
1508    
1509    private int noBufferedRecords = 0;
1510    
1511    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1512    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1513
1514    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1515    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1516
1517    private final int compressionBlockSize;
1518    
1519    BlockCompressWriter(Configuration conf,
1520                        Option... options) throws IOException {
1521      super(conf, options);
1522      compressionBlockSize = 
1523        conf.getInt(IO_SEQFILE_COMPRESS_BLOCKSIZE_KEY,
1524            IO_SEQFILE_COMPRESS_BLOCKSIZE_DEFAULT
1525        );
1526      keySerializer.close();
1527      keySerializer.open(keyBuffer);
1528      uncompressedValSerializer.close();
1529      uncompressedValSerializer.open(valBuffer);
1530    }
1531
1532    /** Workhorse to check and write out compressed data/lengths */
1533    private synchronized 
1534      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1535      throws IOException {
1536      deflateFilter.resetState();
1537      buffer.reset();
1538      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1539                       uncompressedDataBuffer.getLength());
1540      deflateOut.flush();
1541      deflateFilter.finish();
1542      
1543      WritableUtils.writeVInt(out, buffer.getLength());
1544      out.write(buffer.getData(), 0, buffer.getLength());
1545    }
1546    
1547    /** Compress and flush contents to dfs */
1548    @Override
1549    public synchronized void sync() throws IOException {
1550      if (noBufferedRecords > 0) {
1551        super.sync();
1552        
1553        // No. of records
1554        WritableUtils.writeVInt(out, noBufferedRecords);
1555        
1556        // Write 'keys' and lengths
1557        writeBuffer(keyLenBuffer);
1558        writeBuffer(keyBuffer);
1559        
1560        // Write 'values' and lengths
1561        writeBuffer(valLenBuffer);
1562        writeBuffer(valBuffer);
1563        
1564        // Flush the file-stream
1565        out.flush();
1566        
1567        // Reset internal states
1568        keyLenBuffer.reset();
1569        keyBuffer.reset();
1570        valLenBuffer.reset();
1571        valBuffer.reset();
1572        noBufferedRecords = 0;
1573      }
1574      
1575    }
1576    
1577    /** Close the file. */
1578    @Override
1579    public synchronized void close() throws IOException {
1580      if (out != null) {
1581        sync();
1582      }
1583      super.close();
1584    }
1585
1586    /** Append a key/value pair. */
1587    @Override
1588    @SuppressWarnings("unchecked")
1589    public synchronized void append(Object key, Object val)
1590      throws IOException {
1591      if (key.getClass() != keyClass)
1592        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1593      if (val.getClass() != valClass)
1594        throw new IOException("wrong value class: "+val+" is not "+valClass);
1595
1596      // Save key/value into respective buffers 
1597      int oldKeyLength = keyBuffer.getLength();
1598      keySerializer.serialize(key);
1599      int keyLength = keyBuffer.getLength() - oldKeyLength;
1600      if (keyLength < 0)
1601        throw new IOException("negative length keys not allowed: " + key);
1602      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1603
1604      int oldValLength = valBuffer.getLength();
1605      uncompressedValSerializer.serialize(val);
1606      int valLength = valBuffer.getLength() - oldValLength;
1607      WritableUtils.writeVInt(valLenBuffer, valLength);
1608      
1609      // Added another key/value pair
1610      ++noBufferedRecords;
1611      
1612      // Compress and flush?
1613      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1614      if (currentBlockSize >= compressionBlockSize) {
1615        sync();
1616      }
1617    }
1618    
1619    /** Append a key/value pair. */
1620    @Override
1621    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1622        int keyLength, ValueBytes val) throws IOException {
1623      
1624      if (keyLength < 0)
1625        throw new IOException("negative length keys not allowed");
1626
1627      int valLength = val.getSize();
1628      
1629      // Save key/value data in relevant buffers
1630      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1631      keyBuffer.write(keyData, keyOffset, keyLength);
1632      WritableUtils.writeVInt(valLenBuffer, valLength);
1633      val.writeUncompressedBytes(valBuffer);
1634
1635      // Added another key/value pair
1636      ++noBufferedRecords;
1637
1638      // Compress and flush?
1639      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1640      if (currentBlockSize >= compressionBlockSize) {
1641        sync();
1642      }
1643    }
1644  
1645  } // BlockCompressionWriter
1646
1647  /** Get the configured buffer size */
1648  private static int getBufferSize(Configuration conf) {
1649    return conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT);
1650  }
1651
1652  /** Reads key/value pairs from a sequence-format file. */
1653  public static class Reader implements java.io.Closeable {
1654    private String filename;
1655    private FSDataInputStream in;
1656    private DataOutputBuffer outBuf = new DataOutputBuffer();
1657
1658    private byte version;
1659
1660    private String keyClassName;
1661    private String valClassName;
1662    private Class keyClass;
1663    private Class valClass;
1664
1665    private CompressionCodec codec = null;
1666    private Metadata metadata = null;
1667    
1668    private byte[] sync = new byte[SYNC_HASH_SIZE];
1669    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1670    private boolean syncSeen;
1671
1672    private long headerEnd;
1673    private long end;
1674    private int keyLength;
1675    private int recordLength;
1676
1677    private boolean decompress;
1678    private boolean blockCompressed;
1679    
1680    private Configuration conf;
1681
1682    private int noBufferedRecords = 0;
1683    private boolean lazyDecompress = true;
1684    private boolean valuesDecompressed = true;
1685    
1686    private int noBufferedKeys = 0;
1687    private int noBufferedValues = 0;
1688    
1689    private DataInputBuffer keyLenBuffer = null;
1690    private CompressionInputStream keyLenInFilter = null;
1691    private DataInputStream keyLenIn = null;
1692    private Decompressor keyLenDecompressor = null;
1693    private DataInputBuffer keyBuffer = null;
1694    private CompressionInputStream keyInFilter = null;
1695    private DataInputStream keyIn = null;
1696    private Decompressor keyDecompressor = null;
1697
1698    private DataInputBuffer valLenBuffer = null;
1699    private CompressionInputStream valLenInFilter = null;
1700    private DataInputStream valLenIn = null;
1701    private Decompressor valLenDecompressor = null;
1702    private DataInputBuffer valBuffer = null;
1703    private CompressionInputStream valInFilter = null;
1704    private DataInputStream valIn = null;
1705    private Decompressor valDecompressor = null;
1706    
1707    private Deserializer keyDeserializer;
1708    private Deserializer valDeserializer;
1709
1710    /**
1711     * A tag interface for all of the Reader options
1712     */
1713    public static interface Option {}
1714    
1715    /**
1716     * Create an option to specify the path name of the sequence file.
1717     * @param value the path to read
1718     * @return a new option
1719     */
1720    public static Option file(Path value) {
1721      return new FileOption(value);
1722    }
1723    
1724    /**
1725     * Create an option to specify the stream with the sequence file.
1726     * @param value the stream to read.
1727     * @return a new option
1728     */
1729    public static Option stream(FSDataInputStream value) {
1730      return new InputStreamOption(value);
1731    }
1732    
1733    /**
1734     * Create an option to specify the starting byte to read.
1735     * @param value the number of bytes to skip over
1736     * @return a new option
1737     */
1738    public static Option start(long value) {
1739      return new StartOption(value);
1740    }
1741    
1742    /**
1743     * Create an option to specify the number of bytes to read.
1744     * @param value the number of bytes to read
1745     * @return a new option
1746     */
1747    public static Option length(long value) {
1748      return new LengthOption(value);
1749    }
1750    
1751    /**
1752     * Create an option with the buffer size for reading the given pathname.
1753     * @param value the number of bytes to buffer
1754     * @return a new option
1755     */
1756    public static Option bufferSize(int value) {
1757      return new BufferSizeOption(value);
1758    }
1759
1760    private static class FileOption extends Options.PathOption 
1761                                    implements Option {
1762      private FileOption(Path value) {
1763        super(value);
1764      }
1765    }
1766    
1767    private static class InputStreamOption
1768        extends Options.FSDataInputStreamOption 
1769        implements Option {
1770      private InputStreamOption(FSDataInputStream value) {
1771        super(value);
1772      }
1773    }
1774
1775    private static class StartOption extends Options.LongOption
1776                                     implements Option {
1777      private StartOption(long value) {
1778        super(value);
1779      }
1780    }
1781
1782    private static class LengthOption extends Options.LongOption
1783                                      implements Option {
1784      private LengthOption(long value) {
1785        super(value);
1786      }
1787    }
1788
1789    private static class BufferSizeOption extends Options.IntegerOption
1790                                      implements Option {
1791      private BufferSizeOption(int value) {
1792        super(value);
1793      }
1794    }
1795
1796    // only used directly
1797    private static class OnlyHeaderOption extends Options.BooleanOption 
1798                                          implements Option {
1799      private OnlyHeaderOption() {
1800        super(true);
1801      }
1802    }
1803
1804    public Reader(Configuration conf, Option... opts) throws IOException {
1805      // Look up the options, these are null if not set
1806      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1807      InputStreamOption streamOpt = 
1808        Options.getOption(InputStreamOption.class, opts);
1809      StartOption startOpt = Options.getOption(StartOption.class, opts);
1810      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1811      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1812      OnlyHeaderOption headerOnly = 
1813        Options.getOption(OnlyHeaderOption.class, opts);
1814      // check for consistency
1815      if ((fileOpt == null) == (streamOpt == null)) {
1816        throw new 
1817          IllegalArgumentException("File or stream option must be specified");
1818      }
1819      if (fileOpt == null && bufOpt != null) {
1820        throw new IllegalArgumentException("buffer size can only be set when" +
1821                                           " a file is specified.");
1822      }
1823      // figure out the real values
1824      Path filename = null;
1825      FSDataInputStream file;
1826      final long len;
1827      if (fileOpt != null) {
1828        filename = fileOpt.getValue();
1829        FileSystem fs = filename.getFileSystem(conf);
1830        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1831        len = null == lenOpt
1832          ? fs.getFileStatus(filename).getLen()
1833          : lenOpt.getValue();
1834        file = openFile(fs, filename, bufSize, len);
1835      } else {
1836        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1837        file = streamOpt.getValue();
1838      }
1839      long start = startOpt == null ? 0 : startOpt.getValue();
1840      // really set up
1841      initialize(filename, file, start, len, conf, headerOnly != null);
1842    }
1843
1844    /**
1845     * Construct a reader by opening a file from the given file system.
1846     * @param fs The file system used to open the file.
1847     * @param file The file being read.
1848     * @param conf Configuration
1849     * @throws IOException
1850     * @deprecated Use Reader(Configuration, Option...) instead.
1851     */
1852    @Deprecated
1853    public Reader(FileSystem fs, Path file, 
1854                  Configuration conf) throws IOException {
1855      this(conf, file(file.makeQualified(fs)));
1856    }
1857
1858    /**
1859     * Construct a reader by the given input stream.
1860     * @param in An input stream.
1861     * @param buffersize unused
1862     * @param start The starting position.
1863     * @param length The length being read.
1864     * @param conf Configuration
1865     * @throws IOException
1866     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1867     */
1868    @Deprecated
1869    public Reader(FSDataInputStream in, int buffersize,
1870        long start, long length, Configuration conf) throws IOException {
1871      this(conf, stream(in), start(start), length(length));
1872    }
1873
1874    /** Common work of the constructors. */
1875    private void initialize(Path filename, FSDataInputStream in,
1876                            long start, long length, Configuration conf,
1877                            boolean tempReader) throws IOException {
1878      if (in == null) {
1879        throw new IllegalArgumentException("in == null");
1880      }
1881      this.filename = filename == null ? "<unknown>" : filename.toString();
1882      this.in = in;
1883      this.conf = conf;
1884      boolean succeeded = false;
1885      try {
1886        seek(start);
1887        this.end = this.in.getPos() + length;
1888        // if it wrapped around, use the max
1889        if (end < length) {
1890          end = Long.MAX_VALUE;
1891        }
1892        init(tempReader);
1893        succeeded = true;
1894      } finally {
1895        if (!succeeded) {
1896          IOUtils.cleanup(LOG, this.in);
1897        }
1898      }
1899    }
1900
1901    /**
1902     * Override this method to specialize the type of
1903     * {@link FSDataInputStream} returned.
1904     * @param fs The file system used to open the file.
1905     * @param file The file being read.
1906     * @param bufferSize The buffer size used to read the file.
1907     * @param length The length being read if it is >= 0.  Otherwise,
1908     *               the length is not available.
1909     * @return The opened stream.
1910     * @throws IOException
1911     */
1912    protected FSDataInputStream openFile(FileSystem fs, Path file,
1913        int bufferSize, long length) throws IOException {
1914      return fs.open(file, bufferSize);
1915    }
1916    
1917    /**
1918     * Initialize the {@link Reader}
1919     * @param tmpReader <code>true</code> if we are constructing a temporary
1920     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1921     *                  and hence do not initialize every component; 
1922     *                  <code>false</code> otherwise.
1923     * @throws IOException
1924     */
1925    private void init(boolean tempReader) throws IOException {
1926      byte[] versionBlock = new byte[VERSION.length];
1927      String exceptionMsg = this + " not a SequenceFile";
1928
1929      // Try to read sequence file header.
1930      try {
1931        in.readFully(versionBlock);
1932      } catch (EOFException e) {
1933        throw new EOFException(exceptionMsg);
1934      }
1935
1936      if ((versionBlock[0] != VERSION[0]) ||
1937          (versionBlock[1] != VERSION[1]) ||
1938          (versionBlock[2] != VERSION[2])) {
1939        throw new IOException(this + " not a SequenceFile");
1940      }
1941
1942      // Set 'version'
1943      version = versionBlock[3];
1944      if (version > VERSION[3]) {
1945        throw new VersionMismatchException(VERSION[3], version);
1946      }
1947
1948      if (version < BLOCK_COMPRESS_VERSION) {
1949        UTF8 className = new UTF8();
1950
1951        className.readFields(in);
1952        keyClassName = className.toStringChecked(); // key class name
1953
1954        className.readFields(in);
1955        valClassName = className.toStringChecked(); // val class name
1956      } else {
1957        keyClassName = Text.readString(in);
1958        valClassName = Text.readString(in);
1959      }
1960
1961      if (version > 2) {                          // if version > 2
1962        this.decompress = in.readBoolean();       // is compressed?
1963      } else {
1964        decompress = false;
1965      }
1966
1967      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1968        this.blockCompressed = in.readBoolean();  // is block-compressed?
1969      } else {
1970        blockCompressed = false;
1971      }
1972      
1973      // if version >= 5
1974      // setup the compression codec
1975      if (decompress) {
1976        if (version >= CUSTOM_COMPRESS_VERSION) {
1977          String codecClassname = Text.readString(in);
1978          try {
1979            Class<? extends CompressionCodec> codecClass
1980              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1981            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1982          } catch (ClassNotFoundException cnfe) {
1983            throw new IllegalArgumentException("Unknown codec: " + 
1984                                               codecClassname, cnfe);
1985          }
1986        } else {
1987          codec = new DefaultCodec();
1988          ((Configurable)codec).setConf(conf);
1989        }
1990      }
1991      
1992      this.metadata = new Metadata();
1993      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1994        this.metadata.readFields(in);
1995      }
1996      
1997      if (version > 1) {                          // if version > 1
1998        in.readFully(sync);                       // read sync bytes
1999        headerEnd = in.getPos();                  // record end of header
2000      }
2001      
2002      // Initialize... *not* if this we are constructing a temporary Reader
2003      if (!tempReader) {
2004        valBuffer = new DataInputBuffer();
2005        if (decompress) {
2006          valDecompressor = CodecPool.getDecompressor(codec);
2007          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
2008          valIn = new DataInputStream(valInFilter);
2009        } else {
2010          valIn = valBuffer;
2011        }
2012
2013        if (blockCompressed) {
2014          keyLenBuffer = new DataInputBuffer();
2015          keyBuffer = new DataInputBuffer();
2016          valLenBuffer = new DataInputBuffer();
2017
2018          keyLenDecompressor = CodecPool.getDecompressor(codec);
2019          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
2020                                                   keyLenDecompressor);
2021          keyLenIn = new DataInputStream(keyLenInFilter);
2022
2023          keyDecompressor = CodecPool.getDecompressor(codec);
2024          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2025          keyIn = new DataInputStream(keyInFilter);
2026
2027          valLenDecompressor = CodecPool.getDecompressor(codec);
2028          valLenInFilter = codec.createInputStream(valLenBuffer, 
2029                                                   valLenDecompressor);
2030          valLenIn = new DataInputStream(valLenInFilter);
2031        }
2032        
2033        SerializationFactory serializationFactory =
2034          new SerializationFactory(conf);
2035        this.keyDeserializer =
2036          getDeserializer(serializationFactory, getKeyClass());
2037        if (this.keyDeserializer == null) {
2038          throw new IOException(
2039              "Could not find a deserializer for the Key class: '"
2040                  + getKeyClass().getCanonicalName() + "'. "
2041                  + "Please ensure that the configuration '" +
2042                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2043                  + "properly configured, if you're using "
2044                  + "custom serialization.");
2045        }
2046        if (!blockCompressed) {
2047          this.keyDeserializer.open(valBuffer);
2048        } else {
2049          this.keyDeserializer.open(keyIn);
2050        }
2051        this.valDeserializer =
2052          getDeserializer(serializationFactory, getValueClass());
2053        if (this.valDeserializer == null) {
2054          throw new IOException(
2055              "Could not find a deserializer for the Value class: '"
2056                  + getValueClass().getCanonicalName() + "'. "
2057                  + "Please ensure that the configuration '" +
2058                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2059                  + "properly configured, if you're using "
2060                  + "custom serialization.");
2061        }
2062        this.valDeserializer.open(valIn);
2063      }
2064    }
2065    
2066    @SuppressWarnings("unchecked")
2067    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2068      return sf.getDeserializer(c);
2069    }
2070    
2071    /** Close the file. */
2072    @Override
2073    public synchronized void close() throws IOException {
2074      // Return the decompressors to the pool
2075      CodecPool.returnDecompressor(keyLenDecompressor);
2076      CodecPool.returnDecompressor(keyDecompressor);
2077      CodecPool.returnDecompressor(valLenDecompressor);
2078      CodecPool.returnDecompressor(valDecompressor);
2079      keyLenDecompressor = keyDecompressor = null;
2080      valLenDecompressor = valDecompressor = null;
2081      
2082      if (keyDeserializer != null) {
2083        keyDeserializer.close();
2084      }
2085      if (valDeserializer != null) {
2086        valDeserializer.close();
2087      }
2088      
2089      // Close the input-stream
2090      in.close();
2091    }
2092
2093    /** Returns the name of the key class. */
2094    public String getKeyClassName() {
2095      return keyClassName;
2096    }
2097
2098    /** Returns the class of keys in this file. */
2099    public synchronized Class<?> getKeyClass() {
2100      if (null == keyClass) {
2101        try {
2102          keyClass = WritableName.getClass(getKeyClassName(), conf);
2103        } catch (IOException e) {
2104          throw new RuntimeException(e);
2105        }
2106      }
2107      return keyClass;
2108    }
2109
2110    /** Returns the name of the value class. */
2111    public String getValueClassName() {
2112      return valClassName;
2113    }
2114
2115    /** Returns the class of values in this file. */
2116    public synchronized Class<?> getValueClass() {
2117      if (null == valClass) {
2118        try {
2119          valClass = WritableName.getClass(getValueClassName(), conf);
2120        } catch (IOException e) {
2121          throw new RuntimeException(e);
2122        }
2123      }
2124      return valClass;
2125    }
2126
2127    /** Returns true if values are compressed. */
2128    public boolean isCompressed() { return decompress; }
2129    
2130    /** Returns true if records are block-compressed. */
2131    public boolean isBlockCompressed() { return blockCompressed; }
2132    
2133    /** Returns the compression codec of data in this file. */
2134    public CompressionCodec getCompressionCodec() { return codec; }
2135    
2136    private byte[] getSync() {
2137      return sync;
2138    }
2139
2140    private byte getVersion() {
2141      return version;
2142    }
2143
2144    /**
2145     * Get the compression type for this file.
2146     * @return the compression type
2147     */
2148    public CompressionType getCompressionType() {
2149      if (decompress) {
2150        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2151      } else {
2152        return CompressionType.NONE;
2153      }
2154    }
2155
2156    /** Returns the metadata object of the file */
2157    public Metadata getMetadata() {
2158      return this.metadata;
2159    }
2160    
2161    /** Returns the configuration used for this file. */
2162    Configuration getConf() { return conf; }
2163    
2164    /** Read a compressed buffer */
2165    private synchronized void readBuffer(DataInputBuffer buffer, 
2166                                         CompressionInputStream filter) throws IOException {
2167      // Read data into a temporary buffer
2168      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2169
2170      try {
2171        int dataBufferLength = WritableUtils.readVInt(in);
2172        dataBuffer.write(in, dataBufferLength);
2173      
2174        // Set up 'buffer' connected to the input-stream
2175        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2176      } finally {
2177        dataBuffer.close();
2178      }
2179
2180      // Reset the codec
2181      filter.resetState();
2182    }
2183    
2184    /** Read the next 'compressed' block */
2185    private synchronized void readBlock() throws IOException {
2186      // Check if we need to throw away a whole block of 
2187      // 'values' due to 'lazy decompression' 
2188      if (lazyDecompress && !valuesDecompressed) {
2189        in.seek(WritableUtils.readVInt(in)+in.getPos());
2190        in.seek(WritableUtils.readVInt(in)+in.getPos());
2191      }
2192      
2193      // Reset internal states
2194      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2195      valuesDecompressed = false;
2196
2197      //Process sync
2198      if (sync != null) {
2199        in.readInt();
2200        in.readFully(syncCheck);                // read syncCheck
2201        if (!Arrays.equals(sync, syncCheck))    // check it
2202          throw new IOException("File is corrupt!");
2203      }
2204      syncSeen = true;
2205
2206      // Read number of records in this block
2207      noBufferedRecords = WritableUtils.readVInt(in);
2208      
2209      // Read key lengths and keys
2210      readBuffer(keyLenBuffer, keyLenInFilter);
2211      readBuffer(keyBuffer, keyInFilter);
2212      noBufferedKeys = noBufferedRecords;
2213      
2214      // Read value lengths and values
2215      if (!lazyDecompress) {
2216        readBuffer(valLenBuffer, valLenInFilter);
2217        readBuffer(valBuffer, valInFilter);
2218        noBufferedValues = noBufferedRecords;
2219        valuesDecompressed = true;
2220      }
2221    }
2222
2223    /** 
2224     * Position valLenIn/valIn to the 'value' 
2225     * corresponding to the 'current' key 
2226     */
2227    private synchronized void seekToCurrentValue() throws IOException {
2228      if (!blockCompressed) {
2229        if (decompress) {
2230          valInFilter.resetState();
2231        }
2232        valBuffer.reset();
2233      } else {
2234        // Check if this is the first value in the 'block' to be read
2235        if (lazyDecompress && !valuesDecompressed) {
2236          // Read the value lengths and values
2237          readBuffer(valLenBuffer, valLenInFilter);
2238          readBuffer(valBuffer, valInFilter);
2239          noBufferedValues = noBufferedRecords;
2240          valuesDecompressed = true;
2241        }
2242        
2243        // Calculate the no. of bytes to skip
2244        // Note: 'current' key has already been read!
2245        int skipValBytes = 0;
2246        int currentKey = noBufferedKeys + 1;          
2247        for (int i=noBufferedValues; i > currentKey; --i) {
2248          skipValBytes += WritableUtils.readVInt(valLenIn);
2249          --noBufferedValues;
2250        }
2251        
2252        // Skip to the 'val' corresponding to 'current' key
2253        if (skipValBytes > 0) {
2254          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2255            throw new IOException("Failed to seek to " + currentKey + 
2256                                  "(th) value!");
2257          }
2258        }
2259      }
2260    }
2261
2262    /**
2263     * Get the 'value' corresponding to the last read 'key'.
2264     * @param val : The 'value' to be read.
2265     * @throws IOException
2266     */
2267    public synchronized void getCurrentValue(Writable val) 
2268      throws IOException {
2269      if (val instanceof Configurable) {
2270        ((Configurable) val).setConf(this.conf);
2271      }
2272
2273      // Position stream to 'current' value
2274      seekToCurrentValue();
2275
2276      if (!blockCompressed) {
2277        val.readFields(valIn);
2278        
2279        if (valIn.read() > 0) {
2280          LOG.info("available bytes: " + valIn.available());
2281          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2282                                + " bytes, should read " +
2283                                (valBuffer.getLength()-keyLength));
2284        }
2285      } else {
2286        // Get the value
2287        int valLength = WritableUtils.readVInt(valLenIn);
2288        val.readFields(valIn);
2289        
2290        // Read another compressed 'value'
2291        --noBufferedValues;
2292        
2293        // Sanity check
2294        if ((valLength < 0) && LOG.isDebugEnabled()) {
2295          LOG.debug(val + " is a zero-length value");
2296        }
2297      }
2298
2299    }
2300    
2301    /**
2302     * Get the 'value' corresponding to the last read 'key'.
2303     * @param val : The 'value' to be read.
2304     * @throws IOException
2305     */
2306    public synchronized Object getCurrentValue(Object val) 
2307      throws IOException {
2308      if (val instanceof Configurable) {
2309        ((Configurable) val).setConf(this.conf);
2310      }
2311
2312      // Position stream to 'current' value
2313      seekToCurrentValue();
2314
2315      if (!blockCompressed) {
2316        val = deserializeValue(val);
2317        
2318        if (valIn.read() > 0) {
2319          LOG.info("available bytes: " + valIn.available());
2320          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2321                                + " bytes, should read " +
2322                                (valBuffer.getLength()-keyLength));
2323        }
2324      } else {
2325        // Get the value
2326        int valLength = WritableUtils.readVInt(valLenIn);
2327        val = deserializeValue(val);
2328        
2329        // Read another compressed 'value'
2330        --noBufferedValues;
2331        
2332        // Sanity check
2333        if ((valLength < 0) && LOG.isDebugEnabled()) {
2334          LOG.debug(val + " is a zero-length value");
2335        }
2336      }
2337      return val;
2338
2339    }
2340
2341    @SuppressWarnings("unchecked")
2342    private Object deserializeValue(Object val) throws IOException {
2343      return valDeserializer.deserialize(val);
2344    }
2345    
2346    /** Read the next key in the file into <code>key</code>, skipping its
2347     * value.  True if another entry exists, and false at end of file. */
2348    public synchronized boolean next(Writable key) throws IOException {
2349      if (key.getClass() != getKeyClass())
2350        throw new IOException("wrong key class: "+key.getClass().getName()
2351                              +" is not "+keyClass);
2352
2353      if (!blockCompressed) {
2354        outBuf.reset();
2355        
2356        keyLength = next(outBuf);
2357        if (keyLength < 0)
2358          return false;
2359        
2360        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2361        
2362        key.readFields(valBuffer);
2363        valBuffer.mark(0);
2364        if (valBuffer.getPosition() != keyLength)
2365          throw new IOException(key + " read " + valBuffer.getPosition()
2366                                + " bytes, should read " + keyLength);
2367      } else {
2368        //Reset syncSeen
2369        syncSeen = false;
2370        
2371        if (noBufferedKeys == 0) {
2372          try {
2373            readBlock();
2374          } catch (EOFException eof) {
2375            return false;
2376          }
2377        }
2378        
2379        int keyLength = WritableUtils.readVInt(keyLenIn);
2380        
2381        // Sanity check
2382        if (keyLength < 0) {
2383          return false;
2384        }
2385        
2386        //Read another compressed 'key'
2387        key.readFields(keyIn);
2388        --noBufferedKeys;
2389      }
2390
2391      return true;
2392    }
2393
2394    /** Read the next key/value pair in the file into <code>key</code> and
2395     * <code>val</code>.  Returns true if such a pair exists and false when at
2396     * end of file */
2397    public synchronized boolean next(Writable key, Writable val)
2398      throws IOException {
2399      if (val.getClass() != getValueClass())
2400        throw new IOException("wrong value class: "+val+" is not "+valClass);
2401
2402      boolean more = next(key);
2403      
2404      if (more) {
2405        getCurrentValue(val);
2406      }
2407
2408      return more;
2409    }
2410    
2411    /**
2412     * Read and return the next record length, potentially skipping over 
2413     * a sync block.
2414     * @return the length of the next record or -1 if there is no next record
2415     * @throws IOException
2416     */
2417    private synchronized int readRecordLength() throws IOException {
2418      if (in.getPos() >= end) {
2419        return -1;
2420      }      
2421      int length = in.readInt();
2422      if (version > 1 && sync != null &&
2423          length == SYNC_ESCAPE) {              // process a sync entry
2424        in.readFully(syncCheck);                // read syncCheck
2425        if (!Arrays.equals(sync, syncCheck))    // check it
2426          throw new IOException("File is corrupt!");
2427        syncSeen = true;
2428        if (in.getPos() >= end) {
2429          return -1;
2430        }
2431        length = in.readInt();                  // re-read length
2432      } else {
2433        syncSeen = false;
2434      }
2435      
2436      return length;
2437    }
2438    
2439    /** Read the next key/value pair in the file into <code>buffer</code>.
2440     * Returns the length of the key read, or -1 if at end of file.  The length
2441     * of the value may be computed by calling buffer.getLength() before and
2442     * after calls to this method. */
2443    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2444    @Deprecated
2445    synchronized int next(DataOutputBuffer buffer) throws IOException {
2446      // Unsupported for block-compressed sequence files
2447      if (blockCompressed) {
2448        throw new IOException("Unsupported call for block-compressed" +
2449                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2450      }
2451      try {
2452        int length = readRecordLength();
2453        if (length == -1) {
2454          return -1;
2455        }
2456        int keyLength = in.readInt();
2457        buffer.write(in, length);
2458        return keyLength;
2459      } catch (ChecksumException e) {             // checksum failure
2460        handleChecksumException(e);
2461        return next(buffer);
2462      }
2463    }
2464
2465    public ValueBytes createValueBytes() {
2466      ValueBytes val = null;
2467      if (!decompress || blockCompressed) {
2468        val = new UncompressedBytes();
2469      } else {
2470        val = new CompressedBytes(codec);
2471      }
2472      return val;
2473    }
2474
2475    /**
2476     * Read 'raw' records.
2477     * @param key - The buffer into which the key is read
2478     * @param val - The 'raw' value
2479     * @return Returns the total record length or -1 for end of file
2480     * @throws IOException
2481     */
2482    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2483      throws IOException {
2484      if (!blockCompressed) {
2485        int length = readRecordLength();
2486        if (length == -1) {
2487          return -1;
2488        }
2489        int keyLength = in.readInt();
2490        int valLength = length - keyLength;
2491        key.write(in, keyLength);
2492        if (decompress) {
2493          CompressedBytes value = (CompressedBytes)val;
2494          value.reset(in, valLength);
2495        } else {
2496          UncompressedBytes value = (UncompressedBytes)val;
2497          value.reset(in, valLength);
2498        }
2499        
2500        return length;
2501      } else {
2502        //Reset syncSeen
2503        syncSeen = false;
2504        
2505        // Read 'key'
2506        if (noBufferedKeys == 0) {
2507          if (in.getPos() >= end) 
2508            return -1;
2509
2510          try { 
2511            readBlock();
2512          } catch (EOFException eof) {
2513            return -1;
2514          }
2515        }
2516        int keyLength = WritableUtils.readVInt(keyLenIn);
2517        if (keyLength < 0) {
2518          throw new IOException("zero length key found!");
2519        }
2520        key.write(keyIn, keyLength);
2521        --noBufferedKeys;
2522        
2523        // Read raw 'value'
2524        seekToCurrentValue();
2525        int valLength = WritableUtils.readVInt(valLenIn);
2526        UncompressedBytes rawValue = (UncompressedBytes)val;
2527        rawValue.reset(valIn, valLength);
2528        --noBufferedValues;
2529        
2530        return (keyLength+valLength);
2531      }
2532      
2533    }
2534
2535    /**
2536     * Read 'raw' keys.
2537     * @param key - The buffer into which the key is read
2538     * @return Returns the key length or -1 for end of file
2539     * @throws IOException
2540     */
2541    public synchronized int nextRawKey(DataOutputBuffer key) 
2542      throws IOException {
2543      if (!blockCompressed) {
2544        recordLength = readRecordLength();
2545        if (recordLength == -1) {
2546          return -1;
2547        }
2548        keyLength = in.readInt();
2549        key.write(in, keyLength);
2550        return keyLength;
2551      } else {
2552        //Reset syncSeen
2553        syncSeen = false;
2554        
2555        // Read 'key'
2556        if (noBufferedKeys == 0) {
2557          if (in.getPos() >= end) 
2558            return -1;
2559
2560          try { 
2561            readBlock();
2562          } catch (EOFException eof) {
2563            return -1;
2564          }
2565        }
2566        int keyLength = WritableUtils.readVInt(keyLenIn);
2567        if (keyLength < 0) {
2568          throw new IOException("zero length key found!");
2569        }
2570        key.write(keyIn, keyLength);
2571        --noBufferedKeys;
2572        
2573        return keyLength;
2574      }
2575      
2576    }
2577
2578    /** Read the next key in the file, skipping its
2579     * value.  Return null at end of file. */
2580    public synchronized Object next(Object key) throws IOException {
2581      if (key != null && key.getClass() != getKeyClass()) {
2582        throw new IOException("wrong key class: "+key.getClass().getName()
2583                              +" is not "+keyClass);
2584      }
2585
2586      if (!blockCompressed) {
2587        outBuf.reset();
2588        
2589        keyLength = next(outBuf);
2590        if (keyLength < 0)
2591          return null;
2592        
2593        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2594        
2595        key = deserializeKey(key);
2596        valBuffer.mark(0);
2597        if (valBuffer.getPosition() != keyLength)
2598          throw new IOException(key + " read " + valBuffer.getPosition()
2599                                + " bytes, should read " + keyLength);
2600      } else {
2601        //Reset syncSeen
2602        syncSeen = false;
2603        
2604        if (noBufferedKeys == 0) {
2605          try {
2606            readBlock();
2607          } catch (EOFException eof) {
2608            return null;
2609          }
2610        }
2611        
2612        int keyLength = WritableUtils.readVInt(keyLenIn);
2613        
2614        // Sanity check
2615        if (keyLength < 0) {
2616          return null;
2617        }
2618        
2619        //Read another compressed 'key'
2620        key = deserializeKey(key);
2621        --noBufferedKeys;
2622      }
2623
2624      return key;
2625    }
2626
2627    @SuppressWarnings("unchecked")
2628    private Object deserializeKey(Object key) throws IOException {
2629      return keyDeserializer.deserialize(key);
2630    }
2631
2632    /**
2633     * Read 'raw' values.
2634     * @param val - The 'raw' value
2635     * @return Returns the value length
2636     * @throws IOException
2637     */
2638    public synchronized int nextRawValue(ValueBytes val) 
2639      throws IOException {
2640      
2641      // Position stream to current value
2642      seekToCurrentValue();
2643 
2644      if (!blockCompressed) {
2645        int valLength = recordLength - keyLength;
2646        if (decompress) {
2647          CompressedBytes value = (CompressedBytes)val;
2648          value.reset(in, valLength);
2649        } else {
2650          UncompressedBytes value = (UncompressedBytes)val;
2651          value.reset(in, valLength);
2652        }
2653         
2654        return valLength;
2655      } else {
2656        int valLength = WritableUtils.readVInt(valLenIn);
2657        UncompressedBytes rawValue = (UncompressedBytes)val;
2658        rawValue.reset(valIn, valLength);
2659        --noBufferedValues;
2660        return valLength;
2661      }
2662      
2663    }
2664
2665    private void handleChecksumException(ChecksumException e)
2666      throws IOException {
2667      if (this.conf.getBoolean(
2668          IO_SKIP_CHECKSUM_ERRORS_KEY, IO_SKIP_CHECKSUM_ERRORS_DEFAULT)) {
2669        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2670        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2671      } else {
2672        throw e;
2673      }
2674    }
2675
2676    /** disables sync. often invoked for tmp files */
2677    synchronized void ignoreSync() {
2678      sync = null;
2679    }
2680    
2681    /** Set the current byte position in the input file.
2682     *
2683     * <p>The position passed must be a position returned by {@link
2684     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2685     * position, use {@link SequenceFile.Reader#sync(long)}.
2686     */
2687    public synchronized void seek(long position) throws IOException {
2688      in.seek(position);
2689      if (blockCompressed) {                      // trigger block read
2690        noBufferedKeys = 0;
2691        valuesDecompressed = true;
2692      }
2693    }
2694
2695    /** Seek to the next sync mark past a given position.*/
2696    public synchronized void sync(long position) throws IOException {
2697      if (position+SYNC_SIZE >= end) {
2698        seek(end);
2699        return;
2700      }
2701
2702      if (position < headerEnd) {
2703        // seek directly to first record
2704        in.seek(headerEnd);
2705        // note the sync marker "seen" in the header
2706        syncSeen = true;
2707        return;
2708      }
2709
2710      try {
2711        seek(position+4);                         // skip escape
2712        in.readFully(syncCheck);
2713        int syncLen = sync.length;
2714        for (int i = 0; in.getPos() < end; i++) {
2715          int j = 0;
2716          for (; j < syncLen; j++) {
2717            if (sync[j] != syncCheck[(i+j)%syncLen])
2718              break;
2719          }
2720          if (j == syncLen) {
2721            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2722            return;
2723          }
2724          syncCheck[i%syncLen] = in.readByte();
2725        }
2726      } catch (ChecksumException e) {             // checksum failure
2727        handleChecksumException(e);
2728      }
2729    }
2730
2731    /** Returns true iff the previous call to next passed a sync mark.*/
2732    public synchronized boolean syncSeen() { return syncSeen; }
2733
2734    /** Return the current byte position in the input file. */
2735    public synchronized long getPosition() throws IOException {
2736      return in.getPos();
2737    }
2738
2739    /** Returns the name of the file. */
2740    @Override
2741    public String toString() {
2742      return filename;
2743    }
2744
2745  }
2746
2747  /** Sorts key/value pairs in a sequence-format file.
2748   *
2749   * <p>For best performance, applications should make sure that the {@link
2750   * Writable#readFields(DataInput)} implementation of their keys is
2751   * very efficient.  In particular, it should avoid allocating memory.
2752   */
2753  public static class Sorter {
2754
2755    private RawComparator comparator;
2756
2757    private MergeSort mergeSort; //the implementation of merge sort
2758    
2759    private Path[] inFiles;                     // when merging or sorting
2760
2761    private Path outFile;
2762
2763    private int memory; // bytes
2764    private int factor; // merged per pass
2765
2766    private FileSystem fs = null;
2767
2768    private Class keyClass;
2769    private Class valClass;
2770
2771    private Configuration conf;
2772    private Metadata metadata;
2773    
2774    private Progressable progressable = null;
2775
2776    /** Sort and merge files containing the named classes. */
2777    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2778                  Class valClass, Configuration conf)  {
2779      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2780    }
2781
2782    /** Sort and merge using an arbitrary {@link RawComparator}. */
2783    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2784                  Class valClass, Configuration conf) {
2785      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2786    }
2787
2788    /** Sort and merge using an arbitrary {@link RawComparator}. */
2789    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2790                  Class valClass, Configuration conf, Metadata metadata) {
2791      this.fs = fs;
2792      this.comparator = comparator;
2793      this.keyClass = keyClass;
2794      this.valClass = valClass;
2795      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2796      this.factor = conf.getInt("io.sort.factor", 100);
2797      this.conf = conf;
2798      this.metadata = metadata;
2799    }
2800
2801    /** Set the number of streams to merge at once.*/
2802    public void setFactor(int factor) { this.factor = factor; }
2803
2804    /** Get the number of streams to merge at once.*/
2805    public int getFactor() { return factor; }
2806
2807    /** Set the total amount of buffer memory, in bytes.*/
2808    public void setMemory(int memory) { this.memory = memory; }
2809
2810    /** Get the total amount of buffer memory, in bytes.*/
2811    public int getMemory() { return memory; }
2812
2813    /** Set the progressable object in order to report progress. */
2814    public void setProgressable(Progressable progressable) {
2815      this.progressable = progressable;
2816    }
2817    
2818    /** 
2819     * Perform a file sort from a set of input files into an output file.
2820     * @param inFiles the files to be sorted
2821     * @param outFile the sorted output file
2822     * @param deleteInput should the input files be deleted as they are read?
2823     */
2824    public void sort(Path[] inFiles, Path outFile,
2825                     boolean deleteInput) throws IOException {
2826      if (fs.exists(outFile)) {
2827        throw new IOException("already exists: " + outFile);
2828      }
2829
2830      this.inFiles = inFiles;
2831      this.outFile = outFile;
2832
2833      int segments = sortPass(deleteInput);
2834      if (segments > 1) {
2835        mergePass(outFile.getParent());
2836      }
2837    }
2838
2839    /** 
2840     * Perform a file sort from a set of input files and return an iterator.
2841     * @param inFiles the files to be sorted
2842     * @param tempDir the directory where temp files are created during sort
2843     * @param deleteInput should the input files be deleted as they are read?
2844     * @return iterator the RawKeyValueIterator
2845     */
2846    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2847                                              boolean deleteInput) throws IOException {
2848      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2849      if (fs.exists(outFile)) {
2850        throw new IOException("already exists: " + outFile);
2851      }
2852      this.inFiles = inFiles;
2853      //outFile will basically be used as prefix for temp files in the cases
2854      //where sort outputs multiple sorted segments. For the single segment
2855      //case, the outputFile itself will contain the sorted data for that
2856      //segment
2857      this.outFile = outFile;
2858
2859      int segments = sortPass(deleteInput);
2860      if (segments > 1)
2861        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2862                     tempDir);
2863      else if (segments == 1)
2864        return merge(new Path[]{outFile}, true, tempDir);
2865      else return null;
2866    }
2867
2868    /**
2869     * The backwards compatible interface to sort.
2870     * @param inFile the input file to sort
2871     * @param outFile the sorted output file
2872     */
2873    public void sort(Path inFile, Path outFile) throws IOException {
2874      sort(new Path[]{inFile}, outFile, false);
2875    }
2876    
2877    private int sortPass(boolean deleteInput) throws IOException {
2878      if(LOG.isDebugEnabled()) {
2879        LOG.debug("running sort pass");
2880      }
2881      SortPass sortPass = new SortPass();         // make the SortPass
2882      sortPass.setProgressable(progressable);
2883      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2884      try {
2885        return sortPass.run(deleteInput);         // run it
2886      } finally {
2887        sortPass.close();                         // close it
2888      }
2889    }
2890
2891    private class SortPass {
2892      private int memoryLimit = memory/4;
2893      private int recordLimit = 1000000;
2894      
2895      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2896      private byte[] rawBuffer;
2897
2898      private int[] keyOffsets = new int[1024];
2899      private int[] pointers = new int[keyOffsets.length];
2900      private int[] pointersCopy = new int[keyOffsets.length];
2901      private int[] keyLengths = new int[keyOffsets.length];
2902      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2903      
2904      private ArrayList segmentLengths = new ArrayList();
2905      
2906      private Reader in = null;
2907      private FSDataOutputStream out = null;
2908      private FSDataOutputStream indexOut = null;
2909      private Path outName;
2910
2911      private Progressable progressable = null;
2912
2913      public int run(boolean deleteInput) throws IOException {
2914        int segments = 0;
2915        int currentFile = 0;
2916        boolean atEof = (currentFile >= inFiles.length);
2917        CompressionType compressionType;
2918        CompressionCodec codec = null;
2919        segmentLengths.clear();
2920        if (atEof) {
2921          return 0;
2922        }
2923        
2924        // Initialize
2925        in = new Reader(fs, inFiles[currentFile], conf);
2926        compressionType = in.getCompressionType();
2927        codec = in.getCompressionCodec();
2928        
2929        for (int i=0; i < rawValues.length; ++i) {
2930          rawValues[i] = null;
2931        }
2932        
2933        while (!atEof) {
2934          int count = 0;
2935          int bytesProcessed = 0;
2936          rawKeys.reset();
2937          while (!atEof && 
2938                 bytesProcessed < memoryLimit && count < recordLimit) {
2939
2940            // Read a record into buffer
2941            // Note: Attempt to re-use 'rawValue' as far as possible
2942            int keyOffset = rawKeys.getLength();       
2943            ValueBytes rawValue = 
2944              (count == keyOffsets.length || rawValues[count] == null) ? 
2945              in.createValueBytes() : 
2946              rawValues[count];
2947            int recordLength = in.nextRaw(rawKeys, rawValue);
2948            if (recordLength == -1) {
2949              in.close();
2950              if (deleteInput) {
2951                fs.delete(inFiles[currentFile], true);
2952              }
2953              currentFile += 1;
2954              atEof = currentFile >= inFiles.length;
2955              if (!atEof) {
2956                in = new Reader(fs, inFiles[currentFile], conf);
2957              } else {
2958                in = null;
2959              }
2960              continue;
2961            }
2962
2963            int keyLength = rawKeys.getLength() - keyOffset;
2964
2965            if (count == keyOffsets.length)
2966              grow();
2967
2968            keyOffsets[count] = keyOffset;                // update pointers
2969            pointers[count] = count;
2970            keyLengths[count] = keyLength;
2971            rawValues[count] = rawValue;
2972
2973            bytesProcessed += recordLength; 
2974            count++;
2975          }
2976
2977          // buffer is full -- sort & flush it
2978          if(LOG.isDebugEnabled()) {
2979            LOG.debug("flushing segment " + segments);
2980          }
2981          rawBuffer = rawKeys.getData();
2982          sort(count);
2983          // indicate we're making progress
2984          if (progressable != null) {
2985            progressable.progress();
2986          }
2987          flush(count, bytesProcessed, compressionType, codec, 
2988                segments==0 && atEof);
2989          segments++;
2990        }
2991        return segments;
2992      }
2993
2994      public void close() throws IOException {
2995        if (in != null) {
2996          in.close();
2997        }
2998        if (out != null) {
2999          out.close();
3000        }
3001        if (indexOut != null) {
3002          indexOut.close();
3003        }
3004      }
3005
3006      private void grow() {
3007        int newLength = keyOffsets.length * 3 / 2;
3008        keyOffsets = grow(keyOffsets, newLength);
3009        pointers = grow(pointers, newLength);
3010        pointersCopy = new int[newLength];
3011        keyLengths = grow(keyLengths, newLength);
3012        rawValues = grow(rawValues, newLength);
3013      }
3014
3015      private int[] grow(int[] old, int newLength) {
3016        int[] result = new int[newLength];
3017        System.arraycopy(old, 0, result, 0, old.length);
3018        return result;
3019      }
3020      
3021      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
3022        ValueBytes[] result = new ValueBytes[newLength];
3023        System.arraycopy(old, 0, result, 0, old.length);
3024        for (int i=old.length; i < newLength; ++i) {
3025          result[i] = null;
3026        }
3027        return result;
3028      }
3029
3030      private void flush(int count, int bytesProcessed, 
3031                         CompressionType compressionType, 
3032                         CompressionCodec codec, 
3033                         boolean done) throws IOException {
3034        if (out == null) {
3035          outName = done ? outFile : outFile.suffix(".0");
3036          out = fs.create(outName);
3037          if (!done) {
3038            indexOut = fs.create(outName.suffix(".index"));
3039          }
3040        }
3041
3042        long segmentStart = out.getPos();
3043        Writer writer = createWriter(conf, Writer.stream(out), 
3044            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3045            Writer.compression(compressionType, codec),
3046            Writer.metadata(done ? metadata : new Metadata()));
3047        
3048        if (!done) {
3049          writer.sync = null;                     // disable sync on temp files
3050        }
3051
3052        for (int i = 0; i < count; i++) {         // write in sorted order
3053          int p = pointers[i];
3054          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3055        }
3056        writer.close();
3057        
3058        if (!done) {
3059          // Save the segment length
3060          WritableUtils.writeVLong(indexOut, segmentStart);
3061          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3062          indexOut.flush();
3063        }
3064      }
3065
3066      private void sort(int count) {
3067        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3068        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3069      }
3070      class SeqFileComparator implements Comparator<IntWritable> {
3071        @Override
3072        public int compare(IntWritable I, IntWritable J) {
3073          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3074                                    keyLengths[I.get()], rawBuffer, 
3075                                    keyOffsets[J.get()], keyLengths[J.get()]);
3076        }
3077      }
3078      
3079      /** set the progressable object in order to report progress */
3080      public void setProgressable(Progressable progressable)
3081      {
3082        this.progressable = progressable;
3083      }
3084      
3085    } // SequenceFile.Sorter.SortPass
3086
3087    /** The interface to iterate over raw keys/values of SequenceFiles. */
3088    public static interface RawKeyValueIterator {
3089      /** Gets the current raw key
3090       * @return DataOutputBuffer
3091       * @throws IOException
3092       */
3093      DataOutputBuffer getKey() throws IOException; 
3094      /** Gets the current raw value
3095       * @return ValueBytes 
3096       * @throws IOException
3097       */
3098      ValueBytes getValue() throws IOException; 
3099      /** Sets up the current key and value (for getKey and getValue)
3100       * @return true if there exists a key/value, false otherwise 
3101       * @throws IOException
3102       */
3103      boolean next() throws IOException;
3104      /** closes the iterator so that the underlying streams can be closed
3105       * @throws IOException
3106       */
3107      void close() throws IOException;
3108      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3109       * indicating the bytes processed by the iterator so far
3110       */
3111      Progress getProgress();
3112    }    
3113    
3114    /**
3115     * Merges the list of segments of type <code>SegmentDescriptor</code>
3116     * @param segments the list of SegmentDescriptors
3117     * @param tmpDir the directory to write temporary files into
3118     * @return RawKeyValueIterator
3119     * @throws IOException
3120     */
3121    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3122                                     Path tmpDir) 
3123      throws IOException {
3124      // pass in object to report progress, if present
3125      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3126      return mQueue.merge();
3127    }
3128
3129    /**
3130     * Merges the contents of files passed in Path[] using a max factor value
3131     * that is already set
3132     * @param inNames the array of path names
3133     * @param deleteInputs true if the input files should be deleted when 
3134     * unnecessary
3135     * @param tmpDir the directory to write temporary files into
3136     * @return RawKeyValueIteratorMergeQueue
3137     * @throws IOException
3138     */
3139    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3140                                     Path tmpDir) 
3141      throws IOException {
3142      return merge(inNames, deleteInputs, 
3143                   (inNames.length < factor) ? inNames.length : factor,
3144                   tmpDir);
3145    }
3146
3147    /**
3148     * Merges the contents of files passed in Path[]
3149     * @param inNames the array of path names
3150     * @param deleteInputs true if the input files should be deleted when 
3151     * unnecessary
3152     * @param factor the factor that will be used as the maximum merge fan-in
3153     * @param tmpDir the directory to write temporary files into
3154     * @return RawKeyValueIteratorMergeQueue
3155     * @throws IOException
3156     */
3157    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3158                                     int factor, Path tmpDir) 
3159      throws IOException {
3160      //get the segments from inNames
3161      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3162      for (int i = 0; i < inNames.length; i++) {
3163        SegmentDescriptor s = new SegmentDescriptor(0,
3164            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3165        s.preserveInput(!deleteInputs);
3166        s.doSync();
3167        a.add(s);
3168      }
3169      this.factor = factor;
3170      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3171      return mQueue.merge();
3172    }
3173
3174    /**
3175     * Merges the contents of files passed in Path[]
3176     * @param inNames the array of path names
3177     * @param tempDir the directory for creating temp files during merge
3178     * @param deleteInputs true if the input files should be deleted when 
3179     * unnecessary
3180     * @return RawKeyValueIteratorMergeQueue
3181     * @throws IOException
3182     */
3183    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3184                                     boolean deleteInputs) 
3185      throws IOException {
3186      //outFile will basically be used as prefix for temp files for the
3187      //intermediate merge outputs           
3188      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3189      //get the segments from inNames
3190      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3191      for (int i = 0; i < inNames.length; i++) {
3192        SegmentDescriptor s = new SegmentDescriptor(0,
3193            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3194        s.preserveInput(!deleteInputs);
3195        s.doSync();
3196        a.add(s);
3197      }
3198      factor = (inNames.length < factor) ? inNames.length : factor;
3199      // pass in object to report progress, if present
3200      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3201      return mQueue.merge();
3202    }
3203
3204    /**
3205     * Clones the attributes (like compression of the input file and creates a 
3206     * corresponding Writer
3207     * @param inputFile the path of the input file whose attributes should be 
3208     * cloned
3209     * @param outputFile the path of the output file 
3210     * @param prog the Progressable to report status during the file write
3211     * @return Writer
3212     * @throws IOException
3213     */
3214    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3215                                      Progressable prog) throws IOException {
3216      Reader reader = new Reader(conf,
3217                                 Reader.file(inputFile),
3218                                 new Reader.OnlyHeaderOption());
3219      CompressionType compress = reader.getCompressionType();
3220      CompressionCodec codec = reader.getCompressionCodec();
3221      reader.close();
3222
3223      Writer writer = createWriter(conf, 
3224                                   Writer.file(outputFile), 
3225                                   Writer.keyClass(keyClass), 
3226                                   Writer.valueClass(valClass), 
3227                                   Writer.compression(compress, codec), 
3228                                   Writer.progressable(prog));
3229      return writer;
3230    }
3231
3232    /**
3233     * Writes records from RawKeyValueIterator into a file represented by the 
3234     * passed writer
3235     * @param records the RawKeyValueIterator
3236     * @param writer the Writer created earlier 
3237     * @throws IOException
3238     */
3239    public void writeFile(RawKeyValueIterator records, Writer writer) 
3240      throws IOException {
3241      while(records.next()) {
3242        writer.appendRaw(records.getKey().getData(), 0, 
3243                         records.getKey().getLength(), records.getValue());
3244      }
3245      writer.sync();
3246    }
3247        
3248    /** Merge the provided files.
3249     * @param inFiles the array of input path names
3250     * @param outFile the final output file
3251     * @throws IOException
3252     */
3253    public void merge(Path[] inFiles, Path outFile) throws IOException {
3254      if (fs.exists(outFile)) {
3255        throw new IOException("already exists: " + outFile);
3256      }
3257      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3258      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3259      
3260      writeFile(r, writer);
3261
3262      writer.close();
3263    }
3264
3265    /** sort calls this to generate the final merged output */
3266    private int mergePass(Path tmpDir) throws IOException {
3267      if(LOG.isDebugEnabled()) {
3268        LOG.debug("running merge pass");
3269      }
3270      Writer writer = cloneFileAttributes(
3271                                          outFile.suffix(".0"), outFile, null);
3272      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3273                                    outFile.suffix(".0.index"), tmpDir);
3274      writeFile(r, writer);
3275
3276      writer.close();
3277      return 0;
3278    }
3279
3280    /** Used by mergePass to merge the output of the sort
3281     * @param inName the name of the input file containing sorted segments
3282     * @param indexIn the offsets of the sorted segments
3283     * @param tmpDir the relative directory to store intermediate results in
3284     * @return RawKeyValueIterator
3285     * @throws IOException
3286     */
3287    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3288      throws IOException {
3289      //get the segments from indexIn
3290      //we create a SegmentContainer so that we can track segments belonging to
3291      //inName and delete inName as soon as we see that we have looked at all
3292      //the contained segments during the merge process & hence don't need 
3293      //them anymore
3294      SegmentContainer container = new SegmentContainer(inName, indexIn);
3295      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3296      return mQueue.merge();
3297    }
3298    
3299    /** This class implements the core of the merge logic */
3300    private class MergeQueue extends PriorityQueue 
3301      implements RawKeyValueIterator {
3302      private boolean compress;
3303      private boolean blockCompress;
3304      private DataOutputBuffer rawKey = new DataOutputBuffer();
3305      private ValueBytes rawValue;
3306      private long totalBytesProcessed;
3307      private float progPerByte;
3308      private Progress mergeProgress = new Progress();
3309      private Path tmpDir;
3310      private Progressable progress = null; //handle to the progress reporting object
3311      private SegmentDescriptor minSegment;
3312      
3313      //a TreeMap used to store the segments sorted by size (segment offset and
3314      //segment path name is used to break ties between segments of same sizes)
3315      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3316        new TreeMap<SegmentDescriptor, Void>();
3317            
3318      @SuppressWarnings("unchecked")
3319      public void put(SegmentDescriptor stream) throws IOException {
3320        if (size() == 0) {
3321          compress = stream.in.isCompressed();
3322          blockCompress = stream.in.isBlockCompressed();
3323        } else if (compress != stream.in.isCompressed() || 
3324                   blockCompress != stream.in.isBlockCompressed()) {
3325          throw new IOException("All merged files must be compressed or not.");
3326        } 
3327        super.put(stream);
3328      }
3329      
3330      /**
3331       * A queue of file segments to merge
3332       * @param segments the file segments to merge
3333       * @param tmpDir a relative local directory to save intermediate files in
3334       * @param progress the reference to the Progressable object
3335       */
3336      public MergeQueue(List <SegmentDescriptor> segments,
3337          Path tmpDir, Progressable progress) {
3338        int size = segments.size();
3339        for (int i = 0; i < size; i++) {
3340          sortedSegmentSizes.put(segments.get(i), null);
3341        }
3342        this.tmpDir = tmpDir;
3343        this.progress = progress;
3344      }
3345      @Override
3346      protected boolean lessThan(Object a, Object b) {
3347        // indicate we're making progress
3348        if (progress != null) {
3349          progress.progress();
3350        }
3351        SegmentDescriptor msa = (SegmentDescriptor)a;
3352        SegmentDescriptor msb = (SegmentDescriptor)b;
3353        return comparator.compare(msa.getKey().getData(), 0, 
3354                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3355                                  msb.getKey().getLength()) < 0;
3356      }
3357      @Override
3358      public void close() throws IOException {
3359        SegmentDescriptor ms;                           // close inputs
3360        while ((ms = (SegmentDescriptor)pop()) != null) {
3361          ms.cleanup();
3362        }
3363        minSegment = null;
3364      }
3365      @Override
3366      public DataOutputBuffer getKey() throws IOException {
3367        return rawKey;
3368      }
3369      @Override
3370      public ValueBytes getValue() throws IOException {
3371        return rawValue;
3372      }
3373      @Override
3374      public boolean next() throws IOException {
3375        if (size() == 0)
3376          return false;
3377        if (minSegment != null) {
3378          //minSegment is non-null for all invocations of next except the first
3379          //one. For the first invocation, the priority queue is ready for use
3380          //but for the subsequent invocations, first adjust the queue 
3381          adjustPriorityQueue(minSegment);
3382          if (size() == 0) {
3383            minSegment = null;
3384            return false;
3385          }
3386        }
3387        minSegment = (SegmentDescriptor)top();
3388        long startPos = minSegment.in.getPosition(); // Current position in stream
3389        //save the raw key reference
3390        rawKey = minSegment.getKey();
3391        //load the raw value. Re-use the existing rawValue buffer
3392        if (rawValue == null) {
3393          rawValue = minSegment.in.createValueBytes();
3394        }
3395        minSegment.nextRawValue(rawValue);
3396        long endPos = minSegment.in.getPosition(); // End position after reading value
3397        updateProgress(endPos - startPos);
3398        return true;
3399      }
3400      
3401      @Override
3402      public Progress getProgress() {
3403        return mergeProgress; 
3404      }
3405
3406      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3407        long startPos = ms.in.getPosition(); // Current position in stream
3408        boolean hasNext = ms.nextRawKey();
3409        long endPos = ms.in.getPosition(); // End position after reading key
3410        updateProgress(endPos - startPos);
3411        if (hasNext) {
3412          adjustTop();
3413        } else {
3414          pop();
3415          ms.cleanup();
3416        }
3417      }
3418
3419      private void updateProgress(long bytesProcessed) {
3420        totalBytesProcessed += bytesProcessed;
3421        if (progPerByte > 0) {
3422          mergeProgress.set(totalBytesProcessed * progPerByte);
3423        }
3424      }
3425      
3426      /** This is the single level merge that is called multiple times 
3427       * depending on the factor size and the number of segments
3428       * @return RawKeyValueIterator
3429       * @throws IOException
3430       */
3431      public RawKeyValueIterator merge() throws IOException {
3432        //create the MergeStreams from the sorted map created in the constructor
3433        //and dump the final output to a file
3434        int numSegments = sortedSegmentSizes.size();
3435        int origFactor = factor;
3436        int passNo = 1;
3437        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3438        do {
3439          //get the factor for this pass of merge
3440          factor = getPassFactor(passNo, numSegments);
3441          List<SegmentDescriptor> segmentsToMerge =
3442            new ArrayList<SegmentDescriptor>();
3443          int segmentsConsidered = 0;
3444          int numSegmentsToConsider = factor;
3445          while (true) {
3446            //extract the smallest 'factor' number of segment pointers from the 
3447            //TreeMap. Call cleanup on the empty segments (no key/value data)
3448            SegmentDescriptor[] mStream = 
3449              getSegmentDescriptors(numSegmentsToConsider);
3450            for (int i = 0; i < mStream.length; i++) {
3451              if (mStream[i].nextRawKey()) {
3452                segmentsToMerge.add(mStream[i]);
3453                segmentsConsidered++;
3454                // Count the fact that we read some bytes in calling nextRawKey()
3455                updateProgress(mStream[i].in.getPosition());
3456              }
3457              else {
3458                mStream[i].cleanup();
3459                numSegments--; //we ignore this segment for the merge
3460              }
3461            }
3462            //if we have the desired number of segments
3463            //or looked at all available segments, we break
3464            if (segmentsConsidered == factor || 
3465                sortedSegmentSizes.size() == 0) {
3466              break;
3467            }
3468              
3469            numSegmentsToConsider = factor - segmentsConsidered;
3470          }
3471          //feed the streams to the priority queue
3472          initialize(segmentsToMerge.size()); clear();
3473          for (int i = 0; i < segmentsToMerge.size(); i++) {
3474            put(segmentsToMerge.get(i));
3475          }
3476          //if we have lesser number of segments remaining, then just return the
3477          //iterator, else do another single level merge
3478          if (numSegments <= factor) {
3479            //calculate the length of the remaining segments. Required for 
3480            //calculating the merge progress
3481            long totalBytes = 0;
3482            for (int i = 0; i < segmentsToMerge.size(); i++) {
3483              totalBytes += segmentsToMerge.get(i).segmentLength;
3484            }
3485            if (totalBytes != 0) //being paranoid
3486              progPerByte = 1.0f / (float)totalBytes;
3487            //reset factor to what it originally was
3488            factor = origFactor;
3489            return this;
3490          } else {
3491            //we want to spread the creation of temp files on multiple disks if 
3492            //available under the space constraints
3493            long approxOutputSize = 0; 
3494            for (SegmentDescriptor s : segmentsToMerge) {
3495              approxOutputSize += s.segmentLength + 
3496                                  ChecksumFileSystem.getApproxChkSumLength(
3497                                  s.segmentLength);
3498            }
3499            Path tmpFilename = 
3500              new Path(tmpDir, "intermediate").suffix("." + passNo);
3501
3502            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3503                                                tmpFilename.toString(),
3504                                                approxOutputSize, conf);
3505            if(LOG.isDebugEnabled()) { 
3506              LOG.debug("writing intermediate results to " + outputFile);
3507            }
3508            Writer writer = cloneFileAttributes(
3509                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3510                                                fs.makeQualified(outputFile), null);
3511            writer.sync = null; //disable sync for temp files
3512            writeFile(this, writer);
3513            writer.close();
3514            
3515            //we finished one single level merge; now clean up the priority 
3516            //queue
3517            this.close();
3518            
3519            SegmentDescriptor tempSegment = 
3520              new SegmentDescriptor(0,
3521                  fs.getFileStatus(outputFile).getLen(), outputFile);
3522            //put the segment back in the TreeMap
3523            sortedSegmentSizes.put(tempSegment, null);
3524            numSegments = sortedSegmentSizes.size();
3525            passNo++;
3526          }
3527          //we are worried about only the first pass merge factor. So reset the 
3528          //factor to what it originally was
3529          factor = origFactor;
3530        } while(true);
3531      }
3532  
3533      //Hadoop-591
3534      public int getPassFactor(int passNo, int numSegments) {
3535        if (passNo > 1 || numSegments <= factor || factor == 1) 
3536          return factor;
3537        int mod = (numSegments - 1) % (factor - 1);
3538        if (mod == 0)
3539          return factor;
3540        return mod + 1;
3541      }
3542      
3543      /** Return (& remove) the requested number of segment descriptors from the
3544       * sorted map.
3545       */
3546      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3547        if (numDescriptors > sortedSegmentSizes.size())
3548          numDescriptors = sortedSegmentSizes.size();
3549        SegmentDescriptor[] SegmentDescriptors = 
3550          new SegmentDescriptor[numDescriptors];
3551        Iterator iter = sortedSegmentSizes.keySet().iterator();
3552        int i = 0;
3553        while (i < numDescriptors) {
3554          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3555          iter.remove();
3556        }
3557        return SegmentDescriptors;
3558      }
3559    } // SequenceFile.Sorter.MergeQueue
3560
3561    /** This class defines a merge segment. This class can be subclassed to 
3562     * provide a customized cleanup method implementation. In this 
3563     * implementation, cleanup closes the file handle and deletes the file 
3564     */
3565    public class SegmentDescriptor implements Comparable {
3566      
3567      long segmentOffset; //the start of the segment in the file
3568      long segmentLength; //the length of the segment
3569      Path segmentPathName; //the path name of the file containing the segment
3570      boolean ignoreSync = true; //set to true for temp files
3571      private Reader in = null; 
3572      private DataOutputBuffer rawKey = null; //this will hold the current key
3573      private boolean preserveInput = false; //delete input segment files?
3574      
3575      /** Constructs a segment
3576       * @param segmentOffset the offset of the segment in the file
3577       * @param segmentLength the length of the segment
3578       * @param segmentPathName the path name of the file containing the segment
3579       */
3580      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3581                                Path segmentPathName) {
3582        this.segmentOffset = segmentOffset;
3583        this.segmentLength = segmentLength;
3584        this.segmentPathName = segmentPathName;
3585      }
3586      
3587      /** Do the sync checks */
3588      public void doSync() {ignoreSync = false;}
3589      
3590      /** Whether to delete the files when no longer needed */
3591      public void preserveInput(boolean preserve) {
3592        preserveInput = preserve;
3593      }
3594
3595      public boolean shouldPreserveInput() {
3596        return preserveInput;
3597      }
3598      
3599      @Override
3600      public int compareTo(Object o) {
3601        SegmentDescriptor that = (SegmentDescriptor)o;
3602        if (this.segmentLength != that.segmentLength) {
3603          return (this.segmentLength < that.segmentLength ? -1 : 1);
3604        }
3605        if (this.segmentOffset != that.segmentOffset) {
3606          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3607        }
3608        return (this.segmentPathName.toString()).
3609          compareTo(that.segmentPathName.toString());
3610      }
3611
3612      @Override
3613      public boolean equals(Object o) {
3614        if (!(o instanceof SegmentDescriptor)) {
3615          return false;
3616        }
3617        SegmentDescriptor that = (SegmentDescriptor)o;
3618        if (this.segmentLength == that.segmentLength &&
3619            this.segmentOffset == that.segmentOffset &&
3620            this.segmentPathName.toString().equals(
3621              that.segmentPathName.toString())) {
3622          return true;
3623        }
3624        return false;
3625      }
3626
3627      @Override
3628      public int hashCode() {
3629        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3630      }
3631
3632      /** Fills up the rawKey object with the key returned by the Reader
3633       * @return true if there is a key returned; false, otherwise
3634       * @throws IOException
3635       */
3636      public boolean nextRawKey() throws IOException {
3637        if (in == null) {
3638          int bufferSize = getBufferSize(conf); 
3639          Reader reader = new Reader(conf,
3640                                     Reader.file(segmentPathName), 
3641                                     Reader.bufferSize(bufferSize),
3642                                     Reader.start(segmentOffset), 
3643                                     Reader.length(segmentLength));
3644        
3645          //sometimes we ignore syncs especially for temp merge files
3646          if (ignoreSync) reader.ignoreSync();
3647
3648          if (reader.getKeyClass() != keyClass)
3649            throw new IOException("wrong key class: " + reader.getKeyClass() +
3650                                  " is not " + keyClass);
3651          if (reader.getValueClass() != valClass)
3652            throw new IOException("wrong value class: "+reader.getValueClass()+
3653                                  " is not " + valClass);
3654          this.in = reader;
3655          rawKey = new DataOutputBuffer();
3656        }
3657        rawKey.reset();
3658        int keyLength = 
3659          in.nextRawKey(rawKey);
3660        return (keyLength >= 0);
3661      }
3662
3663      /** Fills up the passed rawValue with the value corresponding to the key
3664       * read earlier
3665       * @param rawValue
3666       * @return the length of the value
3667       * @throws IOException
3668       */
3669      public int nextRawValue(ValueBytes rawValue) throws IOException {
3670        int valLength = in.nextRawValue(rawValue);
3671        return valLength;
3672      }
3673      
3674      /** Returns the stored rawKey */
3675      public DataOutputBuffer getKey() {
3676        return rawKey;
3677      }
3678      
3679      /** closes the underlying reader */
3680      private void close() throws IOException {
3681        this.in.close();
3682        this.in = null;
3683      }
3684
3685      /** The default cleanup. Subclasses can override this with a custom 
3686       * cleanup 
3687       */
3688      public void cleanup() throws IOException {
3689        close();
3690        if (!preserveInput) {
3691          fs.delete(segmentPathName, true);
3692        }
3693      }
3694    } // SequenceFile.Sorter.SegmentDescriptor
3695    
3696    /** This class provisions multiple segments contained within a single
3697     *  file
3698     */
3699    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3700
3701      SegmentContainer parentContainer = null;
3702
3703      /** Constructs a segment
3704       * @param segmentOffset the offset of the segment in the file
3705       * @param segmentLength the length of the segment
3706       * @param segmentPathName the path name of the file containing the segment
3707       * @param parent the parent SegmentContainer that holds the segment
3708       */
3709      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3710                                       Path segmentPathName, SegmentContainer parent) {
3711        super(segmentOffset, segmentLength, segmentPathName);
3712        this.parentContainer = parent;
3713      }
3714      /** The default cleanup. Subclasses can override this with a custom 
3715       * cleanup 
3716       */
3717      @Override
3718      public void cleanup() throws IOException {
3719        super.close();
3720        if (super.shouldPreserveInput()) return;
3721        parentContainer.cleanup();
3722      }
3723      
3724      @Override
3725      public boolean equals(Object o) {
3726        if (!(o instanceof LinkedSegmentsDescriptor)) {
3727          return false;
3728        }
3729        return super.equals(o);
3730      }
3731    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3732
3733    /** The class that defines a container for segments to be merged. Primarily
3734     * required to delete temp files as soon as all the contained segments
3735     * have been looked at */
3736    private class SegmentContainer {
3737      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3738      private int numSegmentsContained; //# of segments contained
3739      private Path inName; //input file from where segments are created
3740      
3741      //the list of segments read from the file
3742      private ArrayList <SegmentDescriptor> segments = 
3743        new ArrayList <SegmentDescriptor>();
3744      /** This constructor is there primarily to serve the sort routine that 
3745       * generates a single output file with an associated index file */
3746      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3747        //get the segments from indexIn
3748        FSDataInputStream fsIndexIn = fs.open(indexIn);
3749        long end = fs.getFileStatus(indexIn).getLen();
3750        while (fsIndexIn.getPos() < end) {
3751          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3752          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3753          Path segmentName = inName;
3754          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3755                                                    segmentLength, segmentName, this));
3756        }
3757        fsIndexIn.close();
3758        fs.delete(indexIn, true);
3759        numSegmentsContained = segments.size();
3760        this.inName = inName;
3761      }
3762
3763      public List <SegmentDescriptor> getSegmentList() {
3764        return segments;
3765      }
3766      public void cleanup() throws IOException {
3767        numSegmentsCleanedUp++;
3768        if (numSegmentsCleanedUp == numSegmentsContained) {
3769          fs.delete(inName, true);
3770        }
3771      }
3772    } //SequenceFile.Sorter.SegmentContainer
3773
3774  } // SequenceFile.Sorter
3775
3776} // SequenceFile