001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025import org.apache.commons.logging.*;
026import org.apache.hadoop.util.Options;
027import org.apache.hadoop.fs.*;
028import org.apache.hadoop.fs.Options.CreateOpts;
029import org.apache.hadoop.io.compress.CodecPool;
030import org.apache.hadoop.io.compress.CompressionCodec;
031import org.apache.hadoop.io.compress.CompressionInputStream;
032import org.apache.hadoop.io.compress.CompressionOutputStream;
033import org.apache.hadoop.io.compress.Compressor;
034import org.apache.hadoop.io.compress.Decompressor;
035import org.apache.hadoop.io.compress.DefaultCodec;
036import org.apache.hadoop.io.compress.GzipCodec;
037import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038import org.apache.hadoop.io.serializer.Deserializer;
039import org.apache.hadoop.io.serializer.Serializer;
040import org.apache.hadoop.io.serializer.SerializationFactory;
041import org.apache.hadoop.classification.InterfaceAudience;
042import org.apache.hadoop.classification.InterfaceStability;
043import org.apache.hadoop.conf.*;
044import org.apache.hadoop.util.Progressable;
045import org.apache.hadoop.util.Progress;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.util.NativeCodeLoader;
048import org.apache.hadoop.util.MergeSort;
049import org.apache.hadoop.util.PriorityQueue;
050import org.apache.hadoop.util.Time;
051
052/** 
053 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054 * pairs.
055 * 
056 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
057 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
058 * reading and sorting respectively.</p>
059 * 
060 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
061 * {@link CompressionType} used to compress key/value pairs:
062 * <ol>
063 *   <li>
064 *   <code>Writer</code> : Uncompressed records.
065 *   </li>
066 *   <li>
067 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
068 *                                       values.
069 *   </li>
070 *   <li>
071 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
072 *                                      values are collected in 'blocks' 
073 *                                      separately and compressed. The size of 
074 *                                      the 'block' is configurable.
075 * </ol>
076 * 
077 * <p>The actual compression algorithm used to compress key and/or values can be
078 * specified by using the appropriate {@link CompressionCodec}.</p>
079 * 
080 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
081 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
082 *
083 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
084 * above <code>SequenceFile</code> formats.</p>
085 *
086 * <h4 id="Formats">SequenceFile Formats</h4>
087 * 
088 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
089 * depending on the <code>CompressionType</code> specified. All of them share a
090 * <a href="#Header">common header</a> described below.
091 * 
092 * <h5 id="Header">SequenceFile Header</h5>
093 * <ul>
094 *   <li>
095 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
096 *             version number (e.g. SEQ4 or SEQ6)
097 *   </li>
098 *   <li>
099 *   keyClassName -key class
100 *   </li>
101 *   <li>
102 *   valueClassName - value class
103 *   </li>
104 *   <li>
105 *   compression - A boolean which specifies if compression is turned on for 
106 *                 keys/values in this file.
107 *   </li>
108 *   <li>
109 *   blockCompression - A boolean which specifies if block-compression is 
110 *                      turned on for keys/values in this file.
111 *   </li>
112 *   <li>
113 *   compression codec - <code>CompressionCodec</code> class which is used for  
114 *                       compression of keys and/or values (if compression is 
115 *                       enabled).
116 *   </li>
117 *   <li>
118 *   metadata - {@link Metadata} for this file.
119 *   </li>
120 *   <li>
121 *   sync - A sync marker to denote end of the header.
122 *   </li>
123 * </ul>
124 * 
125 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
126 * <ul>
127 * <li>
128 * <a href="#Header">Header</a>
129 * </li>
130 * <li>
131 * Record
132 *   <ul>
133 *     <li>Record length</li>
134 *     <li>Key length</li>
135 *     <li>Key</li>
136 *     <li>Value</li>
137 *   </ul>
138 * </li>
139 * <li>
140 * A sync-marker every few <code>100</code> bytes or so.
141 * </li>
142 * </ul>
143 *
144 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
145 * <ul>
146 * <li>
147 * <a href="#Header">Header</a>
148 * </li>
149 * <li>
150 * Record
151 *   <ul>
152 *     <li>Record length</li>
153 *     <li>Key length</li>
154 *     <li>Key</li>
155 *     <li><i>Compressed</i> Value</li>
156 *   </ul>
157 * </li>
158 * <li>
159 * A sync-marker every few <code>100</code> bytes or so.
160 * </li>
161 * </ul>
162 * 
163 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
164 * <ul>
165 * <li>
166 * <a href="#Header">Header</a>
167 * </li>
168 * <li>
169 * Record <i>Block</i>
170 *   <ul>
171 *     <li>Uncompressed number of records in the block</li>
172 *     <li>Compressed key-lengths block-size</li>
173 *     <li>Compressed key-lengths block</li>
174 *     <li>Compressed keys block-size</li>
175 *     <li>Compressed keys block</li>
176 *     <li>Compressed value-lengths block-size</li>
177 *     <li>Compressed value-lengths block</li>
178 *     <li>Compressed values block-size</li>
179 *     <li>Compressed values block</li>
180 *   </ul>
181 * </li>
182 * <li>
183 * A sync-marker every block.
184 * </li>
185 * </ul>
186 * 
187 * <p>The compressed blocks of key lengths and value lengths consist of the 
188 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
189 * format.</p>
190 * 
191 * @see CompressionCodec
192 */
193@InterfaceAudience.Public
194@InterfaceStability.Stable
195public class SequenceFile {
196  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
197
198  private SequenceFile() {}                         // no public ctor
199
200  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
201  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
202  private static final byte VERSION_WITH_METADATA = (byte)6;
203  private static byte[] VERSION = new byte[] {
204    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
205  };
206
207  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
208  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
209  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
210
211  /** The number of bytes between sync points.*/
212  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
213
214  /** 
215   * The compression type used to compress key/value pairs in the 
216   * {@link SequenceFile}.
217   * 
218   * @see SequenceFile.Writer
219   */
220  public static enum CompressionType {
221    /** Do not compress records. */
222    NONE, 
223    /** Compress values only, each separately. */
224    RECORD,
225    /** Compress sequences of records together in blocks. */
226    BLOCK
227  }
228
229  /**
230   * Get the compression type for the reduce outputs
231   * @param job the job config to look in
232   * @return the kind of compression to use
233   */
234  static public CompressionType getDefaultCompressionType(Configuration job) {
235    String name = job.get("io.seqfile.compression.type");
236    return name == null ? CompressionType.RECORD : 
237      CompressionType.valueOf(name);
238  }
239  
240  /**
241   * Set the default compression type for sequence files.
242   * @param job the configuration to modify
243   * @param val the new compression type (none, block, record)
244   */
245  static public void setDefaultCompressionType(Configuration job, 
246                                               CompressionType val) {
247    job.set("io.seqfile.compression.type", val.toString());
248  }
249
250  /**
251   * Create a new Writer with the given options.
252   * @param conf the configuration to use
253   * @param opts the options to create the file with
254   * @return a new Writer
255   * @throws IOException
256   */
257  public static Writer createWriter(Configuration conf, Writer.Option... opts
258                                    ) throws IOException {
259    Writer.CompressionOption compressionOption = 
260      Options.getOption(Writer.CompressionOption.class, opts);
261    CompressionType kind;
262    if (compressionOption != null) {
263      kind = compressionOption.getValue();
264    } else {
265      kind = getDefaultCompressionType(conf);
266      opts = Options.prependOptions(opts, Writer.compression(kind));
267    }
268    switch (kind) {
269      default:
270      case NONE:
271        return new Writer(conf, opts);
272      case RECORD:
273        return new RecordCompressWriter(conf, opts);
274      case BLOCK:
275        return new BlockCompressWriter(conf, opts);
276    }
277  }
278
279  /**
280   * Construct the preferred type of SequenceFile Writer.
281   * @param fs The configured filesystem. 
282   * @param conf The configuration.
283   * @param name The name of the file. 
284   * @param keyClass The 'key' type.
285   * @param valClass The 'value' type.
286   * @return Returns the handle to the constructed SequenceFile Writer.
287   * @throws IOException
288   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
289   *     instead.
290   */
291  @Deprecated
292  public static Writer 
293    createWriter(FileSystem fs, Configuration conf, Path name, 
294                 Class keyClass, Class valClass) throws IOException {
295    return createWriter(conf, Writer.filesystem(fs),
296                        Writer.file(name), Writer.keyClass(keyClass),
297                        Writer.valueClass(valClass));
298  }
299  
300  /**
301   * Construct the preferred type of SequenceFile Writer.
302   * @param fs The configured filesystem. 
303   * @param conf The configuration.
304   * @param name The name of the file. 
305   * @param keyClass The 'key' type.
306   * @param valClass The 'value' type.
307   * @param compressionType The compression type.
308   * @return Returns the handle to the constructed SequenceFile Writer.
309   * @throws IOException
310   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
311   *     instead.
312   */
313  @Deprecated
314  public static Writer 
315    createWriter(FileSystem fs, Configuration conf, Path name, 
316                 Class keyClass, Class valClass, 
317                 CompressionType compressionType) throws IOException {
318    return createWriter(conf, Writer.filesystem(fs),
319                        Writer.file(name), Writer.keyClass(keyClass),
320                        Writer.valueClass(valClass), 
321                        Writer.compression(compressionType));
322  }
323  
324  /**
325   * Construct the preferred type of SequenceFile Writer.
326   * @param fs The configured filesystem. 
327   * @param conf The configuration.
328   * @param name The name of the file. 
329   * @param keyClass The 'key' type.
330   * @param valClass The 'value' type.
331   * @param compressionType The compression type.
332   * @param progress The Progressable object to track progress.
333   * @return Returns the handle to the constructed SequenceFile Writer.
334   * @throws IOException
335   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
336   *     instead.
337   */
338  @Deprecated
339  public static Writer
340    createWriter(FileSystem fs, Configuration conf, Path name, 
341                 Class keyClass, Class valClass, CompressionType compressionType,
342                 Progressable progress) throws IOException {
343    return createWriter(conf, Writer.file(name),
344                        Writer.filesystem(fs),
345                        Writer.keyClass(keyClass),
346                        Writer.valueClass(valClass), 
347                        Writer.compression(compressionType),
348                        Writer.progressable(progress));
349  }
350
351  /**
352   * Construct the preferred type of SequenceFile Writer.
353   * @param fs The configured filesystem. 
354   * @param conf The configuration.
355   * @param name The name of the file. 
356   * @param keyClass The 'key' type.
357   * @param valClass The 'value' type.
358   * @param compressionType The compression type.
359   * @param codec The compression codec.
360   * @return Returns the handle to the constructed SequenceFile Writer.
361   * @throws IOException
362   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
363   *     instead.
364   */
365  @Deprecated
366  public static Writer 
367    createWriter(FileSystem fs, Configuration conf, Path name, 
368                 Class keyClass, Class valClass, CompressionType compressionType, 
369                 CompressionCodec codec) throws IOException {
370    return createWriter(conf, Writer.file(name),
371                        Writer.filesystem(fs),
372                        Writer.keyClass(keyClass),
373                        Writer.valueClass(valClass), 
374                        Writer.compression(compressionType, codec));
375  }
376  
377  /**
378   * Construct the preferred type of SequenceFile Writer.
379   * @param fs The configured filesystem. 
380   * @param conf The configuration.
381   * @param name The name of the file. 
382   * @param keyClass The 'key' type.
383   * @param valClass The 'value' type.
384   * @param compressionType The compression type.
385   * @param codec The compression codec.
386   * @param progress The Progressable object to track progress.
387   * @param metadata The metadata of the file.
388   * @return Returns the handle to the constructed SequenceFile Writer.
389   * @throws IOException
390   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
391   *     instead.
392   */
393  @Deprecated
394  public static Writer
395    createWriter(FileSystem fs, Configuration conf, Path name, 
396                 Class keyClass, Class valClass, 
397                 CompressionType compressionType, CompressionCodec codec,
398                 Progressable progress, Metadata metadata) throws IOException {
399    return createWriter(conf, Writer.file(name),
400                        Writer.filesystem(fs),
401                        Writer.keyClass(keyClass),
402                        Writer.valueClass(valClass),
403                        Writer.compression(compressionType, codec),
404                        Writer.progressable(progress),
405                        Writer.metadata(metadata));
406  }
407
408  /**
409   * Construct the preferred type of SequenceFile Writer.
410   * @param fs The configured filesystem.
411   * @param conf The configuration.
412   * @param name The name of the file.
413   * @param keyClass The 'key' type.
414   * @param valClass The 'value' type.
415   * @param bufferSize buffer size for the underlaying outputstream.
416   * @param replication replication factor for the file.
417   * @param blockSize block size for the file.
418   * @param compressionType The compression type.
419   * @param codec The compression codec.
420   * @param progress The Progressable object to track progress.
421   * @param metadata The metadata of the file.
422   * @return Returns the handle to the constructed SequenceFile Writer.
423   * @throws IOException
424   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
425   *     instead.
426   */
427  @Deprecated
428  public static Writer
429    createWriter(FileSystem fs, Configuration conf, Path name,
430                 Class keyClass, Class valClass, int bufferSize,
431                 short replication, long blockSize,
432                 CompressionType compressionType, CompressionCodec codec,
433                 Progressable progress, Metadata metadata) throws IOException {
434    return createWriter(conf, Writer.file(name),
435                        Writer.filesystem(fs),
436                        Writer.keyClass(keyClass),
437                        Writer.valueClass(valClass), 
438                        Writer.bufferSize(bufferSize), 
439                        Writer.replication(replication),
440                        Writer.blockSize(blockSize),
441                        Writer.compression(compressionType, codec),
442                        Writer.progressable(progress),
443                        Writer.metadata(metadata));
444  }
445
446  /**
447   * Construct the preferred type of SequenceFile Writer.
448   * @param fs The configured filesystem.
449   * @param conf The configuration.
450   * @param name The name of the file.
451   * @param keyClass The 'key' type.
452   * @param valClass The 'value' type.
453   * @param bufferSize buffer size for the underlaying outputstream.
454   * @param replication replication factor for the file.
455   * @param blockSize block size for the file.
456   * @param createParent create parent directory if non-existent
457   * @param compressionType The compression type.
458   * @param codec The compression codec.
459   * @param metadata The metadata of the file.
460   * @return Returns the handle to the constructed SequenceFile Writer.
461   * @throws IOException
462   */
463  @Deprecated
464  public static Writer
465  createWriter(FileSystem fs, Configuration conf, Path name,
466               Class keyClass, Class valClass, int bufferSize,
467               short replication, long blockSize, boolean createParent,
468               CompressionType compressionType, CompressionCodec codec,
469               Metadata metadata) throws IOException {
470    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
471        conf, name, keyClass, valClass, compressionType, codec,
472        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
473        CreateOpts.bufferSize(bufferSize),
474        createParent ? CreateOpts.createParent()
475                     : CreateOpts.donotCreateParent(),
476        CreateOpts.repFac(replication),
477        CreateOpts.blockSize(blockSize)
478      );
479  }
480
481  /**
482   * Construct the preferred type of SequenceFile Writer.
483   * @param fc The context for the specified file.
484   * @param conf The configuration.
485   * @param name The name of the file.
486   * @param keyClass The 'key' type.
487   * @param valClass The 'value' type.
488   * @param compressionType The compression type.
489   * @param codec The compression codec.
490   * @param metadata The metadata of the file.
491   * @param createFlag gives the semantics of create: overwrite, append etc.
492   * @param opts file creation options; see {@link CreateOpts}.
493   * @return Returns the handle to the constructed SequenceFile Writer.
494   * @throws IOException
495   */
496  public static Writer
497  createWriter(FileContext fc, Configuration conf, Path name,
498               Class keyClass, Class valClass,
499               CompressionType compressionType, CompressionCodec codec,
500               Metadata metadata,
501               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
502               throws IOException {
503    return createWriter(conf, fc.create(name, createFlag, opts),
504          keyClass, valClass, compressionType, codec, metadata).ownStream();
505  }
506
507  /**
508   * Construct the preferred type of SequenceFile Writer.
509   * @param fs The configured filesystem. 
510   * @param conf The configuration.
511   * @param name The name of the file. 
512   * @param keyClass The 'key' type.
513   * @param valClass The 'value' type.
514   * @param compressionType The compression type.
515   * @param codec The compression codec.
516   * @param progress The Progressable object to track progress.
517   * @return Returns the handle to the constructed SequenceFile Writer.
518   * @throws IOException
519   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
520   *     instead.
521   */
522  @Deprecated
523  public static Writer
524    createWriter(FileSystem fs, Configuration conf, Path name, 
525                 Class keyClass, Class valClass, 
526                 CompressionType compressionType, CompressionCodec codec,
527                 Progressable progress) throws IOException {
528    return createWriter(conf, Writer.file(name),
529                        Writer.filesystem(fs),
530                        Writer.keyClass(keyClass),
531                        Writer.valueClass(valClass),
532                        Writer.compression(compressionType, codec),
533                        Writer.progressable(progress));
534  }
535
536  /**
537   * Construct the preferred type of 'raw' SequenceFile Writer.
538   * @param conf The configuration.
539   * @param out The stream on top which the writer is to be constructed.
540   * @param keyClass The 'key' type.
541   * @param valClass The 'value' type.
542   * @param compressionType The compression type.
543   * @param codec The compression codec.
544   * @param metadata The metadata of the file.
545   * @return Returns the handle to the constructed SequenceFile Writer.
546   * @throws IOException
547   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
548   *     instead.
549   */
550  @Deprecated
551  public static Writer
552    createWriter(Configuration conf, FSDataOutputStream out, 
553                 Class keyClass, Class valClass,
554                 CompressionType compressionType,
555                 CompressionCodec codec, Metadata metadata) throws IOException {
556    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
557                        Writer.valueClass(valClass), 
558                        Writer.compression(compressionType, codec),
559                        Writer.metadata(metadata));
560  }
561  
562  /**
563   * Construct the preferred type of 'raw' SequenceFile Writer.
564   * @param conf The configuration.
565   * @param out The stream on top which the writer is to be constructed.
566   * @param keyClass The 'key' type.
567   * @param valClass The 'value' type.
568   * @param compressionType The compression type.
569   * @param codec The compression codec.
570   * @return Returns the handle to the constructed SequenceFile Writer.
571   * @throws IOException
572   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
573   *     instead.
574   */
575  @Deprecated
576  public static Writer
577    createWriter(Configuration conf, FSDataOutputStream out, 
578                 Class keyClass, Class valClass, CompressionType compressionType,
579                 CompressionCodec codec) throws IOException {
580    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
581                        Writer.valueClass(valClass),
582                        Writer.compression(compressionType, codec));
583  }
584  
585
586  /** The interface to 'raw' values of SequenceFiles. */
587  public static interface ValueBytes {
588
589    /** Writes the uncompressed bytes to the outStream.
590     * @param outStream : Stream to write uncompressed bytes into.
591     * @throws IOException
592     */
593    public void writeUncompressedBytes(DataOutputStream outStream)
594      throws IOException;
595
596    /** Write compressed bytes to outStream. 
597     * Note: that it will NOT compress the bytes if they are not compressed.
598     * @param outStream : Stream to write compressed bytes into.
599     */
600    public void writeCompressedBytes(DataOutputStream outStream) 
601      throws IllegalArgumentException, IOException;
602
603    /**
604     * Size of stored data.
605     */
606    public int getSize();
607  }
608  
609  private static class UncompressedBytes implements ValueBytes {
610    private int dataSize;
611    private byte[] data;
612    
613    private UncompressedBytes() {
614      data = null;
615      dataSize = 0;
616    }
617    
618    private void reset(DataInputStream in, int length) throws IOException {
619      if (data == null) {
620        data = new byte[length];
621      } else if (length > data.length) {
622        data = new byte[Math.max(length, data.length * 2)];
623      }
624      dataSize = -1;
625      in.readFully(data, 0, length);
626      dataSize = length;
627    }
628    
629    @Override
630    public int getSize() {
631      return dataSize;
632    }
633    
634    @Override
635    public void writeUncompressedBytes(DataOutputStream outStream)
636      throws IOException {
637      outStream.write(data, 0, dataSize);
638    }
639
640    @Override
641    public void writeCompressedBytes(DataOutputStream outStream) 
642      throws IllegalArgumentException, IOException {
643      throw 
644        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
645    }
646
647  } // UncompressedBytes
648  
649  private static class CompressedBytes implements ValueBytes {
650    private int dataSize;
651    private byte[] data;
652    DataInputBuffer rawData = null;
653    CompressionCodec codec = null;
654    CompressionInputStream decompressedStream = null;
655
656    private CompressedBytes(CompressionCodec codec) {
657      data = null;
658      dataSize = 0;
659      this.codec = codec;
660    }
661
662    private void reset(DataInputStream in, int length) throws IOException {
663      if (data == null) {
664        data = new byte[length];
665      } else if (length > data.length) {
666        data = new byte[Math.max(length, data.length * 2)];
667      } 
668      dataSize = -1;
669      in.readFully(data, 0, length);
670      dataSize = length;
671    }
672    
673    @Override
674    public int getSize() {
675      return dataSize;
676    }
677    
678    @Override
679    public void writeUncompressedBytes(DataOutputStream outStream)
680      throws IOException {
681      if (decompressedStream == null) {
682        rawData = new DataInputBuffer();
683        decompressedStream = codec.createInputStream(rawData);
684      } else {
685        decompressedStream.resetState();
686      }
687      rawData.reset(data, 0, dataSize);
688
689      byte[] buffer = new byte[8192];
690      int bytesRead = 0;
691      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
692        outStream.write(buffer, 0, bytesRead);
693      }
694    }
695
696    @Override
697    public void writeCompressedBytes(DataOutputStream outStream) 
698      throws IllegalArgumentException, IOException {
699      outStream.write(data, 0, dataSize);
700    }
701
702  } // CompressedBytes
703  
704  /**
705   * The class encapsulating with the metadata of a file.
706   * The metadata of a file is a list of attribute name/value
707   * pairs of Text type.
708   *
709   */
710  public static class Metadata implements Writable {
711
712    private TreeMap<Text, Text> theMetadata;
713    
714    public Metadata() {
715      this(new TreeMap<Text, Text>());
716    }
717    
718    public Metadata(TreeMap<Text, Text> arg) {
719      if (arg == null) {
720        this.theMetadata = new TreeMap<Text, Text>();
721      } else {
722        this.theMetadata = arg;
723      }
724    }
725    
726    public Text get(Text name) {
727      return this.theMetadata.get(name);
728    }
729    
730    public void set(Text name, Text value) {
731      this.theMetadata.put(name, value);
732    }
733    
734    public TreeMap<Text, Text> getMetadata() {
735      return new TreeMap<Text, Text>(this.theMetadata);
736    }
737    
738    @Override
739    public void write(DataOutput out) throws IOException {
740      out.writeInt(this.theMetadata.size());
741      Iterator<Map.Entry<Text, Text>> iter =
742        this.theMetadata.entrySet().iterator();
743      while (iter.hasNext()) {
744        Map.Entry<Text, Text> en = iter.next();
745        en.getKey().write(out);
746        en.getValue().write(out);
747      }
748    }
749
750    @Override
751    public void readFields(DataInput in) throws IOException {
752      int sz = in.readInt();
753      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
754      this.theMetadata = new TreeMap<Text, Text>();
755      for (int i = 0; i < sz; i++) {
756        Text key = new Text();
757        Text val = new Text();
758        key.readFields(in);
759        val.readFields(in);
760        this.theMetadata.put(key, val);
761      }    
762    }
763
764    @Override
765    public boolean equals(Object other) {
766      if (other == null) {
767        return false;
768      }
769      if (other.getClass() != this.getClass()) {
770        return false;
771      } else {
772        return equals((Metadata)other);
773      }
774    }
775    
776    public boolean equals(Metadata other) {
777      if (other == null) return false;
778      if (this.theMetadata.size() != other.theMetadata.size()) {
779        return false;
780      }
781      Iterator<Map.Entry<Text, Text>> iter1 =
782        this.theMetadata.entrySet().iterator();
783      Iterator<Map.Entry<Text, Text>> iter2 =
784        other.theMetadata.entrySet().iterator();
785      while (iter1.hasNext() && iter2.hasNext()) {
786        Map.Entry<Text, Text> en1 = iter1.next();
787        Map.Entry<Text, Text> en2 = iter2.next();
788        if (!en1.getKey().equals(en2.getKey())) {
789          return false;
790        }
791        if (!en1.getValue().equals(en2.getValue())) {
792          return false;
793        }
794      }
795      if (iter1.hasNext() || iter2.hasNext()) {
796        return false;
797      }
798      return true;
799    }
800
801    @Override
802    public int hashCode() {
803      assert false : "hashCode not designed";
804      return 42; // any arbitrary constant will do 
805    }
806    
807    @Override
808    public String toString() {
809      StringBuilder sb = new StringBuilder();
810      sb.append("size: ").append(this.theMetadata.size()).append("\n");
811      Iterator<Map.Entry<Text, Text>> iter =
812        this.theMetadata.entrySet().iterator();
813      while (iter.hasNext()) {
814        Map.Entry<Text, Text> en = iter.next();
815        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
816        sb.append("\n");
817      }
818      return sb.toString();
819    }
820  }
821  
822  /** Write key/value pairs to a sequence-format file. */
823  public static class Writer implements java.io.Closeable, Syncable {
824    private Configuration conf;
825    FSDataOutputStream out;
826    boolean ownOutputStream = true;
827    DataOutputBuffer buffer = new DataOutputBuffer();
828
829    Class keyClass;
830    Class valClass;
831
832    private final CompressionType compress;
833    CompressionCodec codec = null;
834    CompressionOutputStream deflateFilter = null;
835    DataOutputStream deflateOut = null;
836    Metadata metadata = null;
837    Compressor compressor = null;
838
839    private boolean appendMode = false;
840
841    protected Serializer keySerializer;
842    protected Serializer uncompressedValSerializer;
843    protected Serializer compressedValSerializer;
844    
845    // Insert a globally unique 16-byte value every few entries, so that one
846    // can seek into the middle of a file and then synchronize with record
847    // starts and ends by scanning for this value.
848    long lastSyncPos;                     // position of last sync
849    byte[] sync;                          // 16 random bytes
850    {
851      try {                                       
852        MessageDigest digester = MessageDigest.getInstance("MD5");
853        long time = Time.now();
854        digester.update((new UID()+"@"+time).getBytes());
855        sync = digester.digest();
856      } catch (Exception e) {
857        throw new RuntimeException(e);
858      }
859    }
860
861    public static interface Option {}
862    
863    static class FileOption extends Options.PathOption 
864                                    implements Option {
865      FileOption(Path path) {
866        super(path);
867      }
868    }
869
870    /**
871     * @deprecated only used for backwards-compatibility in the createWriter methods
872     * that take FileSystem.
873     */
874    @Deprecated
875    private static class FileSystemOption implements Option {
876      private final FileSystem value;
877      protected FileSystemOption(FileSystem value) {
878        this.value = value;
879      }
880      public FileSystem getValue() {
881        return value;
882      }
883    }
884
885    static class StreamOption extends Options.FSDataOutputStreamOption 
886                              implements Option {
887      StreamOption(FSDataOutputStream stream) {
888        super(stream);
889      }
890    }
891
892    static class BufferSizeOption extends Options.IntegerOption
893                                  implements Option {
894      BufferSizeOption(int value) {
895        super(value);
896      }
897    }
898    
899    static class BlockSizeOption extends Options.LongOption implements Option {
900      BlockSizeOption(long value) {
901        super(value);
902      }
903    }
904
905    static class ReplicationOption extends Options.IntegerOption
906                                   implements Option {
907      ReplicationOption(int value) {
908        super(value);
909      }
910    }
911
912    static class AppendIfExistsOption extends Options.BooleanOption implements
913        Option {
914      AppendIfExistsOption(boolean value) {
915        super(value);
916      }
917    }
918
919    static class KeyClassOption extends Options.ClassOption implements Option {
920      KeyClassOption(Class<?> value) {
921        super(value);
922      }
923    }
924
925    static class ValueClassOption extends Options.ClassOption
926                                          implements Option {
927      ValueClassOption(Class<?> value) {
928        super(value);
929      }
930    }
931
932    static class MetadataOption implements Option {
933      private final Metadata value;
934      MetadataOption(Metadata value) {
935        this.value = value;
936      }
937      Metadata getValue() {
938        return value;
939      }
940    }
941
942    static class ProgressableOption extends Options.ProgressableOption
943                                    implements Option {
944      ProgressableOption(Progressable value) {
945        super(value);
946      }
947    }
948
949    private static class CompressionOption implements Option {
950      private final CompressionType value;
951      private final CompressionCodec codec;
952      CompressionOption(CompressionType value) {
953        this(value, null);
954      }
955      CompressionOption(CompressionType value, CompressionCodec codec) {
956        this.value = value;
957        this.codec = (CompressionType.NONE != value && null == codec)
958          ? new DefaultCodec()
959          : codec;
960      }
961      CompressionType getValue() {
962        return value;
963      }
964      CompressionCodec getCodec() {
965        return codec;
966      }
967    }
968
969    public static Option file(Path value) {
970      return new FileOption(value);
971    }
972
973    /**
974     * @deprecated only used for backwards-compatibility in the createWriter methods
975     * that take FileSystem.
976     */
977    @Deprecated
978    private static Option filesystem(FileSystem fs) {
979      return new SequenceFile.Writer.FileSystemOption(fs);
980    }
981    
982    public static Option bufferSize(int value) {
983      return new BufferSizeOption(value);
984    }
985    
986    public static Option stream(FSDataOutputStream value) {
987      return new StreamOption(value);
988    }
989    
990    public static Option replication(short value) {
991      return new ReplicationOption(value);
992    }
993    
994    public static Option appendIfExists(boolean value) {
995      return new AppendIfExistsOption(value);
996    }
997
998    public static Option blockSize(long value) {
999      return new BlockSizeOption(value);
1000    }
1001    
1002    public static Option progressable(Progressable value) {
1003      return new ProgressableOption(value);
1004    }
1005
1006    public static Option keyClass(Class<?> value) {
1007      return new KeyClassOption(value);
1008    }
1009    
1010    public static Option valueClass(Class<?> value) {
1011      return new ValueClassOption(value);
1012    }
1013    
1014    public static Option metadata(Metadata value) {
1015      return new MetadataOption(value);
1016    }
1017
1018    public static Option compression(CompressionType value) {
1019      return new CompressionOption(value);
1020    }
1021
1022    public static Option compression(CompressionType value,
1023        CompressionCodec codec) {
1024      return new CompressionOption(value, codec);
1025    }
1026    
1027    /**
1028     * Construct a uncompressed writer from a set of options.
1029     * @param conf the configuration to use
1030     * @param options the options used when creating the writer
1031     * @throws IOException if it fails
1032     */
1033    Writer(Configuration conf, 
1034           Option... opts) throws IOException {
1035      BlockSizeOption blockSizeOption = 
1036        Options.getOption(BlockSizeOption.class, opts);
1037      BufferSizeOption bufferSizeOption = 
1038        Options.getOption(BufferSizeOption.class, opts);
1039      ReplicationOption replicationOption = 
1040        Options.getOption(ReplicationOption.class, opts);
1041      ProgressableOption progressOption = 
1042        Options.getOption(ProgressableOption.class, opts);
1043      FileOption fileOption = Options.getOption(FileOption.class, opts);
1044      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1045          AppendIfExistsOption.class, opts);
1046      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1047      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1048      KeyClassOption keyClassOption = 
1049        Options.getOption(KeyClassOption.class, opts);
1050      ValueClassOption valueClassOption = 
1051        Options.getOption(ValueClassOption.class, opts);
1052      MetadataOption metadataOption = 
1053        Options.getOption(MetadataOption.class, opts);
1054      CompressionOption compressionTypeOption =
1055        Options.getOption(CompressionOption.class, opts);
1056      // check consistency of options
1057      if ((fileOption == null) == (streamOption == null)) {
1058        throw new IllegalArgumentException("file or stream must be specified");
1059      }
1060      if (fileOption == null && (blockSizeOption != null ||
1061                                 bufferSizeOption != null ||
1062                                 replicationOption != null ||
1063                                 progressOption != null)) {
1064        throw new IllegalArgumentException("file modifier options not " +
1065                                           "compatible with stream");
1066      }
1067
1068      FSDataOutputStream out;
1069      boolean ownStream = fileOption != null;
1070      if (ownStream) {
1071        Path p = fileOption.getValue();
1072        FileSystem fs;
1073        if (fsOption != null) {
1074          fs = fsOption.getValue();
1075        } else {
1076          fs = p.getFileSystem(conf);
1077        }
1078        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1079          bufferSizeOption.getValue();
1080        short replication = replicationOption == null ? 
1081          fs.getDefaultReplication(p) :
1082          (short) replicationOption.getValue();
1083        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1084          blockSizeOption.getValue();
1085        Progressable progress = progressOption == null ? null :
1086          progressOption.getValue();
1087
1088        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1089            && fs.exists(p)) {
1090
1091          // Read the file and verify header details
1092          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1093              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1094          try {
1095
1096            if (keyClassOption.getValue() != reader.getKeyClass()
1097                || valueClassOption.getValue() != reader.getValueClass()) {
1098              throw new IllegalArgumentException(
1099                  "Key/value class provided does not match the file");
1100            }
1101
1102            if (reader.getVersion() != VERSION[3]) {
1103              throw new VersionMismatchException(VERSION[3],
1104                  reader.getVersion());
1105            }
1106
1107            if (metadataOption != null) {
1108              LOG.info("MetaData Option is ignored during append");
1109            }
1110            metadataOption = (MetadataOption) SequenceFile.Writer
1111                .metadata(reader.getMetadata());
1112
1113            CompressionOption readerCompressionOption = new CompressionOption(
1114                reader.getCompressionType(), reader.getCompressionCodec());
1115
1116            if (readerCompressionOption.value != compressionTypeOption.value
1117                || !readerCompressionOption.codec.getClass().getName()
1118                    .equals(compressionTypeOption.codec.getClass().getName())) {
1119              throw new IllegalArgumentException(
1120                  "Compression option provided does not match the file");
1121            }
1122
1123            sync = reader.getSync();
1124
1125          } finally {
1126            reader.close();
1127          }
1128
1129          out = fs.append(p, bufferSize, progress);
1130          this.appendMode = true;
1131        } else {
1132          out = fs
1133              .create(p, true, bufferSize, replication, blockSize, progress);
1134        }
1135      } else {
1136        out = streamOption.getValue();
1137      }
1138      Class<?> keyClass = keyClassOption == null ?
1139          Object.class : keyClassOption.getValue();
1140      Class<?> valueClass = valueClassOption == null ?
1141          Object.class : valueClassOption.getValue();
1142      Metadata metadata = metadataOption == null ?
1143          new Metadata() : metadataOption.getValue();
1144      this.compress = compressionTypeOption.getValue();
1145      final CompressionCodec codec = compressionTypeOption.getCodec();
1146      if (codec != null &&
1147          (codec instanceof GzipCodec) &&
1148          !NativeCodeLoader.isNativeCodeLoaded() &&
1149          !ZlibFactory.isNativeZlibLoaded(conf)) {
1150        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1151                                           "GzipCodec without native-hadoop " +
1152                                           "code!");
1153      }
1154      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1155    }
1156
1157    /** Create the named file.
1158     * @deprecated Use 
1159     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1160     *   instead.
1161     */
1162    @Deprecated
1163    public Writer(FileSystem fs, Configuration conf, Path name, 
1164                  Class keyClass, Class valClass) throws IOException {
1165      this.compress = CompressionType.NONE;
1166      init(conf, fs.create(name), true, keyClass, valClass, null, 
1167           new Metadata());
1168    }
1169    
1170    /** Create the named file with write-progress reporter.
1171     * @deprecated Use 
1172     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1173     *   instead.
1174     */
1175    @Deprecated
1176    public Writer(FileSystem fs, Configuration conf, Path name, 
1177                  Class keyClass, Class valClass,
1178                  Progressable progress, Metadata metadata) throws IOException {
1179      this.compress = CompressionType.NONE;
1180      init(conf, fs.create(name, progress), true, keyClass, valClass,
1181           null, metadata);
1182    }
1183    
1184    /** Create the named file with write-progress reporter. 
1185     * @deprecated Use 
1186     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1187     *   instead.
1188     */
1189    @Deprecated
1190    public Writer(FileSystem fs, Configuration conf, Path name,
1191                  Class keyClass, Class valClass,
1192                  int bufferSize, short replication, long blockSize,
1193                  Progressable progress, Metadata metadata) throws IOException {
1194      this.compress = CompressionType.NONE;
1195      init(conf,
1196           fs.create(name, true, bufferSize, replication, blockSize, progress),
1197           true, keyClass, valClass, null, metadata);
1198    }
1199
1200    boolean isCompressed() { return compress != CompressionType.NONE; }
1201    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1202    
1203    Writer ownStream() { this.ownOutputStream = true; return this;  }
1204
1205    /** Write and flush the file header. */
1206    private void writeFileHeader() 
1207      throws IOException {
1208      out.write(VERSION);
1209      Text.writeString(out, keyClass.getName());
1210      Text.writeString(out, valClass.getName());
1211      
1212      out.writeBoolean(this.isCompressed());
1213      out.writeBoolean(this.isBlockCompressed());
1214      
1215      if (this.isCompressed()) {
1216        Text.writeString(out, (codec.getClass()).getName());
1217      }
1218      this.metadata.write(out);
1219      out.write(sync);                       // write the sync bytes
1220      out.flush();                           // flush header
1221    }
1222
1223    /** Initialize. */
1224    @SuppressWarnings("unchecked")
1225    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1226              Class keyClass, Class valClass,
1227              CompressionCodec codec, Metadata metadata) 
1228      throws IOException {
1229      this.conf = conf;
1230      this.out = out;
1231      this.ownOutputStream = ownStream;
1232      this.keyClass = keyClass;
1233      this.valClass = valClass;
1234      this.codec = codec;
1235      this.metadata = metadata;
1236      SerializationFactory serializationFactory = new SerializationFactory(conf);
1237      this.keySerializer = serializationFactory.getSerializer(keyClass);
1238      if (this.keySerializer == null) {
1239        throw new IOException(
1240            "Could not find a serializer for the Key class: '"
1241                + keyClass.getCanonicalName() + "'. "
1242                + "Please ensure that the configuration '" +
1243                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1244                + "properly configured, if you're using"
1245                + "custom serialization.");
1246      }
1247      this.keySerializer.open(buffer);
1248      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1249      if (this.uncompressedValSerializer == null) {
1250        throw new IOException(
1251            "Could not find a serializer for the Value class: '"
1252                + valClass.getCanonicalName() + "'. "
1253                + "Please ensure that the configuration '" +
1254                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1255                + "properly configured, if you're using"
1256                + "custom serialization.");
1257      }
1258      this.uncompressedValSerializer.open(buffer);
1259      if (this.codec != null) {
1260        ReflectionUtils.setConf(this.codec, this.conf);
1261        this.compressor = CodecPool.getCompressor(this.codec);
1262        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1263        this.deflateOut = 
1264          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1265        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1266        if (this.compressedValSerializer == null) {
1267          throw new IOException(
1268              "Could not find a serializer for the Value class: '"
1269                  + valClass.getCanonicalName() + "'. "
1270                  + "Please ensure that the configuration '" +
1271                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1272                  + "properly configured, if you're using"
1273                  + "custom serialization.");
1274        }
1275        this.compressedValSerializer.open(deflateOut);
1276      }
1277
1278      if (appendMode) {
1279        sync();
1280      } else {
1281        writeFileHeader();
1282      }
1283    }
1284    
1285    /** Returns the class of keys in this file. */
1286    public Class getKeyClass() { return keyClass; }
1287
1288    /** Returns the class of values in this file. */
1289    public Class getValueClass() { return valClass; }
1290
1291    /** Returns the compression codec of data in this file. */
1292    public CompressionCodec getCompressionCodec() { return codec; }
1293    
1294    /** create a sync point */
1295    public void sync() throws IOException {
1296      if (sync != null && lastSyncPos != out.getPos()) {
1297        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1298        out.write(sync);                          // write sync
1299        lastSyncPos = out.getPos();               // update lastSyncPos
1300      }
1301    }
1302
1303    /**
1304     * flush all currently written data to the file system
1305     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1306     */
1307    @Deprecated
1308    public void syncFs() throws IOException {
1309      if (out != null) {
1310        out.sync();                               // flush contents to file system
1311      }
1312    }
1313
1314    @Override
1315    public void hsync() throws IOException {
1316      if (out != null) {
1317        out.hsync();
1318      }
1319    }
1320
1321    @Override
1322    public void hflush() throws IOException {
1323      if (out != null) {
1324        out.hflush();
1325      }
1326    }
1327    
1328    /** Returns the configuration of this file. */
1329    Configuration getConf() { return conf; }
1330    
1331    /** Close the file. */
1332    @Override
1333    public synchronized void close() throws IOException {
1334      keySerializer.close();
1335      uncompressedValSerializer.close();
1336      if (compressedValSerializer != null) {
1337        compressedValSerializer.close();
1338      }
1339
1340      CodecPool.returnCompressor(compressor);
1341      compressor = null;
1342      
1343      if (out != null) {
1344        
1345        // Close the underlying stream iff we own it...
1346        if (ownOutputStream) {
1347          out.close();
1348        } else {
1349          out.flush();
1350        }
1351        out = null;
1352      }
1353    }
1354
1355    synchronized void checkAndWriteSync() throws IOException {
1356      if (sync != null &&
1357          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1358        sync();
1359      }
1360    }
1361
1362    /** Append a key/value pair. */
1363    public void append(Writable key, Writable val)
1364      throws IOException {
1365      append((Object) key, (Object) val);
1366    }
1367
1368    /** Append a key/value pair. */
1369    @SuppressWarnings("unchecked")
1370    public synchronized void append(Object key, Object val)
1371      throws IOException {
1372      if (key.getClass() != keyClass)
1373        throw new IOException("wrong key class: "+key.getClass().getName()
1374                              +" is not "+keyClass);
1375      if (val.getClass() != valClass)
1376        throw new IOException("wrong value class: "+val.getClass().getName()
1377                              +" is not "+valClass);
1378
1379      buffer.reset();
1380
1381      // Append the 'key'
1382      keySerializer.serialize(key);
1383      int keyLength = buffer.getLength();
1384      if (keyLength < 0)
1385        throw new IOException("negative length keys not allowed: " + key);
1386
1387      // Append the 'value'
1388      if (compress == CompressionType.RECORD) {
1389        deflateFilter.resetState();
1390        compressedValSerializer.serialize(val);
1391        deflateOut.flush();
1392        deflateFilter.finish();
1393      } else {
1394        uncompressedValSerializer.serialize(val);
1395      }
1396
1397      // Write the record out
1398      checkAndWriteSync();                                // sync
1399      out.writeInt(buffer.getLength());                   // total record length
1400      out.writeInt(keyLength);                            // key portion length
1401      out.write(buffer.getData(), 0, buffer.getLength()); // data
1402    }
1403
1404    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1405        int keyLength, ValueBytes val) throws IOException {
1406      if (keyLength < 0)
1407        throw new IOException("negative length keys not allowed: " + keyLength);
1408
1409      int valLength = val.getSize();
1410
1411      checkAndWriteSync();
1412      
1413      out.writeInt(keyLength+valLength);          // total record length
1414      out.writeInt(keyLength);                    // key portion length
1415      out.write(keyData, keyOffset, keyLength);   // key
1416      val.writeUncompressedBytes(out);            // value
1417    }
1418
1419    /** Returns the current length of the output file.
1420     *
1421     * <p>This always returns a synchronized position.  In other words,
1422     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1423     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1424     * the key may be earlier in the file than key last written when this
1425     * method was called (e.g., with block-compression, it may be the first key
1426     * in the block that was being written when this method was called).
1427     */
1428    public synchronized long getLength() throws IOException {
1429      return out.getPos();
1430    }
1431
1432  } // class Writer
1433
1434  /** Write key/compressed-value pairs to a sequence-format file. */
1435  static class RecordCompressWriter extends Writer {
1436    
1437    RecordCompressWriter(Configuration conf, 
1438                         Option... options) throws IOException {
1439      super(conf, options);
1440    }
1441
1442    /** Append a key/value pair. */
1443    @Override
1444    @SuppressWarnings("unchecked")
1445    public synchronized void append(Object key, Object val)
1446      throws IOException {
1447      if (key.getClass() != keyClass)
1448        throw new IOException("wrong key class: "+key.getClass().getName()
1449                              +" is not "+keyClass);
1450      if (val.getClass() != valClass)
1451        throw new IOException("wrong value class: "+val.getClass().getName()
1452                              +" is not "+valClass);
1453
1454      buffer.reset();
1455
1456      // Append the 'key'
1457      keySerializer.serialize(key);
1458      int keyLength = buffer.getLength();
1459      if (keyLength < 0)
1460        throw new IOException("negative length keys not allowed: " + key);
1461
1462      // Compress 'value' and append it
1463      deflateFilter.resetState();
1464      compressedValSerializer.serialize(val);
1465      deflateOut.flush();
1466      deflateFilter.finish();
1467
1468      // Write the record out
1469      checkAndWriteSync();                                // sync
1470      out.writeInt(buffer.getLength());                   // total record length
1471      out.writeInt(keyLength);                            // key portion length
1472      out.write(buffer.getData(), 0, buffer.getLength()); // data
1473    }
1474
1475    /** Append a key/value pair. */
1476    @Override
1477    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1478        int keyLength, ValueBytes val) throws IOException {
1479
1480      if (keyLength < 0)
1481        throw new IOException("negative length keys not allowed: " + keyLength);
1482
1483      int valLength = val.getSize();
1484      
1485      checkAndWriteSync();                        // sync
1486      out.writeInt(keyLength+valLength);          // total record length
1487      out.writeInt(keyLength);                    // key portion length
1488      out.write(keyData, keyOffset, keyLength);   // 'key' data
1489      val.writeCompressedBytes(out);              // 'value' data
1490    }
1491    
1492  } // RecordCompressionWriter
1493
1494  /** Write compressed key/value blocks to a sequence-format file. */
1495  static class BlockCompressWriter extends Writer {
1496    
1497    private int noBufferedRecords = 0;
1498    
1499    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1500    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1501
1502    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1503    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1504
1505    private final int compressionBlockSize;
1506    
1507    BlockCompressWriter(Configuration conf,
1508                        Option... options) throws IOException {
1509      super(conf, options);
1510      compressionBlockSize = 
1511        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1512      keySerializer.close();
1513      keySerializer.open(keyBuffer);
1514      uncompressedValSerializer.close();
1515      uncompressedValSerializer.open(valBuffer);
1516    }
1517
1518    /** Workhorse to check and write out compressed data/lengths */
1519    private synchronized 
1520      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1521      throws IOException {
1522      deflateFilter.resetState();
1523      buffer.reset();
1524      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1525                       uncompressedDataBuffer.getLength());
1526      deflateOut.flush();
1527      deflateFilter.finish();
1528      
1529      WritableUtils.writeVInt(out, buffer.getLength());
1530      out.write(buffer.getData(), 0, buffer.getLength());
1531    }
1532    
1533    /** Compress and flush contents to dfs */
1534    @Override
1535    public synchronized void sync() throws IOException {
1536      if (noBufferedRecords > 0) {
1537        super.sync();
1538        
1539        // No. of records
1540        WritableUtils.writeVInt(out, noBufferedRecords);
1541        
1542        // Write 'keys' and lengths
1543        writeBuffer(keyLenBuffer);
1544        writeBuffer(keyBuffer);
1545        
1546        // Write 'values' and lengths
1547        writeBuffer(valLenBuffer);
1548        writeBuffer(valBuffer);
1549        
1550        // Flush the file-stream
1551        out.flush();
1552        
1553        // Reset internal states
1554        keyLenBuffer.reset();
1555        keyBuffer.reset();
1556        valLenBuffer.reset();
1557        valBuffer.reset();
1558        noBufferedRecords = 0;
1559      }
1560      
1561    }
1562    
1563    /** Close the file. */
1564    @Override
1565    public synchronized void close() throws IOException {
1566      if (out != null) {
1567        sync();
1568      }
1569      super.close();
1570    }
1571
1572    /** Append a key/value pair. */
1573    @Override
1574    @SuppressWarnings("unchecked")
1575    public synchronized void append(Object key, Object val)
1576      throws IOException {
1577      if (key.getClass() != keyClass)
1578        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1579      if (val.getClass() != valClass)
1580        throw new IOException("wrong value class: "+val+" is not "+valClass);
1581
1582      // Save key/value into respective buffers 
1583      int oldKeyLength = keyBuffer.getLength();
1584      keySerializer.serialize(key);
1585      int keyLength = keyBuffer.getLength() - oldKeyLength;
1586      if (keyLength < 0)
1587        throw new IOException("negative length keys not allowed: " + key);
1588      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1589
1590      int oldValLength = valBuffer.getLength();
1591      uncompressedValSerializer.serialize(val);
1592      int valLength = valBuffer.getLength() - oldValLength;
1593      WritableUtils.writeVInt(valLenBuffer, valLength);
1594      
1595      // Added another key/value pair
1596      ++noBufferedRecords;
1597      
1598      // Compress and flush?
1599      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1600      if (currentBlockSize >= compressionBlockSize) {
1601        sync();
1602      }
1603    }
1604    
1605    /** Append a key/value pair. */
1606    @Override
1607    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1608        int keyLength, ValueBytes val) throws IOException {
1609      
1610      if (keyLength < 0)
1611        throw new IOException("negative length keys not allowed");
1612
1613      int valLength = val.getSize();
1614      
1615      // Save key/value data in relevant buffers
1616      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1617      keyBuffer.write(keyData, keyOffset, keyLength);
1618      WritableUtils.writeVInt(valLenBuffer, valLength);
1619      val.writeUncompressedBytes(valBuffer);
1620
1621      // Added another key/value pair
1622      ++noBufferedRecords;
1623
1624      // Compress and flush?
1625      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1626      if (currentBlockSize >= compressionBlockSize) {
1627        sync();
1628      }
1629    }
1630  
1631  } // BlockCompressionWriter
1632
1633  /** Get the configured buffer size */
1634  private static int getBufferSize(Configuration conf) {
1635    return conf.getInt("io.file.buffer.size", 4096);
1636  }
1637
1638  /** Reads key/value pairs from a sequence-format file. */
1639  public static class Reader implements java.io.Closeable {
1640    private String filename;
1641    private FSDataInputStream in;
1642    private DataOutputBuffer outBuf = new DataOutputBuffer();
1643
1644    private byte version;
1645
1646    private String keyClassName;
1647    private String valClassName;
1648    private Class keyClass;
1649    private Class valClass;
1650
1651    private CompressionCodec codec = null;
1652    private Metadata metadata = null;
1653    
1654    private byte[] sync = new byte[SYNC_HASH_SIZE];
1655    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1656    private boolean syncSeen;
1657
1658    private long headerEnd;
1659    private long end;
1660    private int keyLength;
1661    private int recordLength;
1662
1663    private boolean decompress;
1664    private boolean blockCompressed;
1665    
1666    private Configuration conf;
1667
1668    private int noBufferedRecords = 0;
1669    private boolean lazyDecompress = true;
1670    private boolean valuesDecompressed = true;
1671    
1672    private int noBufferedKeys = 0;
1673    private int noBufferedValues = 0;
1674    
1675    private DataInputBuffer keyLenBuffer = null;
1676    private CompressionInputStream keyLenInFilter = null;
1677    private DataInputStream keyLenIn = null;
1678    private Decompressor keyLenDecompressor = null;
1679    private DataInputBuffer keyBuffer = null;
1680    private CompressionInputStream keyInFilter = null;
1681    private DataInputStream keyIn = null;
1682    private Decompressor keyDecompressor = null;
1683
1684    private DataInputBuffer valLenBuffer = null;
1685    private CompressionInputStream valLenInFilter = null;
1686    private DataInputStream valLenIn = null;
1687    private Decompressor valLenDecompressor = null;
1688    private DataInputBuffer valBuffer = null;
1689    private CompressionInputStream valInFilter = null;
1690    private DataInputStream valIn = null;
1691    private Decompressor valDecompressor = null;
1692    
1693    private Deserializer keyDeserializer;
1694    private Deserializer valDeserializer;
1695
1696    /**
1697     * A tag interface for all of the Reader options
1698     */
1699    public static interface Option {}
1700    
1701    /**
1702     * Create an option to specify the path name of the sequence file.
1703     * @param value the path to read
1704     * @return a new option
1705     */
1706    public static Option file(Path value) {
1707      return new FileOption(value);
1708    }
1709    
1710    /**
1711     * Create an option to specify the stream with the sequence file.
1712     * @param value the stream to read.
1713     * @return a new option
1714     */
1715    public static Option stream(FSDataInputStream value) {
1716      return new InputStreamOption(value);
1717    }
1718    
1719    /**
1720     * Create an option to specify the starting byte to read.
1721     * @param value the number of bytes to skip over
1722     * @return a new option
1723     */
1724    public static Option start(long value) {
1725      return new StartOption(value);
1726    }
1727    
1728    /**
1729     * Create an option to specify the number of bytes to read.
1730     * @param value the number of bytes to read
1731     * @return a new option
1732     */
1733    public static Option length(long value) {
1734      return new LengthOption(value);
1735    }
1736    
1737    /**
1738     * Create an option with the buffer size for reading the given pathname.
1739     * @param value the number of bytes to buffer
1740     * @return a new option
1741     */
1742    public static Option bufferSize(int value) {
1743      return new BufferSizeOption(value);
1744    }
1745
1746    private static class FileOption extends Options.PathOption 
1747                                    implements Option {
1748      private FileOption(Path value) {
1749        super(value);
1750      }
1751    }
1752    
1753    private static class InputStreamOption
1754        extends Options.FSDataInputStreamOption 
1755        implements Option {
1756      private InputStreamOption(FSDataInputStream value) {
1757        super(value);
1758      }
1759    }
1760
1761    private static class StartOption extends Options.LongOption
1762                                     implements Option {
1763      private StartOption(long value) {
1764        super(value);
1765      }
1766    }
1767
1768    private static class LengthOption extends Options.LongOption
1769                                      implements Option {
1770      private LengthOption(long value) {
1771        super(value);
1772      }
1773    }
1774
1775    private static class BufferSizeOption extends Options.IntegerOption
1776                                      implements Option {
1777      private BufferSizeOption(int value) {
1778        super(value);
1779      }
1780    }
1781
1782    // only used directly
1783    private static class OnlyHeaderOption extends Options.BooleanOption 
1784                                          implements Option {
1785      private OnlyHeaderOption() {
1786        super(true);
1787      }
1788    }
1789
1790    public Reader(Configuration conf, Option... opts) throws IOException {
1791      // Look up the options, these are null if not set
1792      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1793      InputStreamOption streamOpt = 
1794        Options.getOption(InputStreamOption.class, opts);
1795      StartOption startOpt = Options.getOption(StartOption.class, opts);
1796      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1797      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1798      OnlyHeaderOption headerOnly = 
1799        Options.getOption(OnlyHeaderOption.class, opts);
1800      // check for consistency
1801      if ((fileOpt == null) == (streamOpt == null)) {
1802        throw new 
1803          IllegalArgumentException("File or stream option must be specified");
1804      }
1805      if (fileOpt == null && bufOpt != null) {
1806        throw new IllegalArgumentException("buffer size can only be set when" +
1807                                           " a file is specified.");
1808      }
1809      // figure out the real values
1810      Path filename = null;
1811      FSDataInputStream file;
1812      final long len;
1813      if (fileOpt != null) {
1814        filename = fileOpt.getValue();
1815        FileSystem fs = filename.getFileSystem(conf);
1816        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1817        len = null == lenOpt
1818          ? fs.getFileStatus(filename).getLen()
1819          : lenOpt.getValue();
1820        file = openFile(fs, filename, bufSize, len);
1821      } else {
1822        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1823        file = streamOpt.getValue();
1824      }
1825      long start = startOpt == null ? 0 : startOpt.getValue();
1826      // really set up
1827      initialize(filename, file, start, len, conf, headerOnly != null);
1828    }
1829
1830    /**
1831     * Construct a reader by opening a file from the given file system.
1832     * @param fs The file system used to open the file.
1833     * @param file The file being read.
1834     * @param conf Configuration
1835     * @throws IOException
1836     * @deprecated Use Reader(Configuration, Option...) instead.
1837     */
1838    @Deprecated
1839    public Reader(FileSystem fs, Path file, 
1840                  Configuration conf) throws IOException {
1841      this(conf, file(file.makeQualified(fs)));
1842    }
1843
1844    /**
1845     * Construct a reader by the given input stream.
1846     * @param in An input stream.
1847     * @param buffersize unused
1848     * @param start The starting position.
1849     * @param length The length being read.
1850     * @param conf Configuration
1851     * @throws IOException
1852     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1853     */
1854    @Deprecated
1855    public Reader(FSDataInputStream in, int buffersize,
1856        long start, long length, Configuration conf) throws IOException {
1857      this(conf, stream(in), start(start), length(length));
1858    }
1859
1860    /** Common work of the constructors. */
1861    private void initialize(Path filename, FSDataInputStream in,
1862                            long start, long length, Configuration conf,
1863                            boolean tempReader) throws IOException {
1864      if (in == null) {
1865        throw new IllegalArgumentException("in == null");
1866      }
1867      this.filename = filename == null ? "<unknown>" : filename.toString();
1868      this.in = in;
1869      this.conf = conf;
1870      boolean succeeded = false;
1871      try {
1872        seek(start);
1873        this.end = this.in.getPos() + length;
1874        // if it wrapped around, use the max
1875        if (end < length) {
1876          end = Long.MAX_VALUE;
1877        }
1878        init(tempReader);
1879        succeeded = true;
1880      } finally {
1881        if (!succeeded) {
1882          IOUtils.cleanup(LOG, this.in);
1883        }
1884      }
1885    }
1886
1887    /**
1888     * Override this method to specialize the type of
1889     * {@link FSDataInputStream} returned.
1890     * @param fs The file system used to open the file.
1891     * @param file The file being read.
1892     * @param bufferSize The buffer size used to read the file.
1893     * @param length The length being read if it is >= 0.  Otherwise,
1894     *               the length is not available.
1895     * @return The opened stream.
1896     * @throws IOException
1897     */
1898    protected FSDataInputStream openFile(FileSystem fs, Path file,
1899        int bufferSize, long length) throws IOException {
1900      return fs.open(file, bufferSize);
1901    }
1902    
1903    /**
1904     * Initialize the {@link Reader}
1905     * @param tmpReader <code>true</code> if we are constructing a temporary
1906     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1907     *                  and hence do not initialize every component; 
1908     *                  <code>false</code> otherwise.
1909     * @throws IOException
1910     */
1911    private void init(boolean tempReader) throws IOException {
1912      byte[] versionBlock = new byte[VERSION.length];
1913      in.readFully(versionBlock);
1914
1915      if ((versionBlock[0] != VERSION[0]) ||
1916          (versionBlock[1] != VERSION[1]) ||
1917          (versionBlock[2] != VERSION[2]))
1918        throw new IOException(this + " not a SequenceFile");
1919
1920      // Set 'version'
1921      version = versionBlock[3];
1922      if (version > VERSION[3])
1923        throw new VersionMismatchException(VERSION[3], version);
1924
1925      if (version < BLOCK_COMPRESS_VERSION) {
1926        UTF8 className = new UTF8();
1927
1928        className.readFields(in);
1929        keyClassName = className.toStringChecked(); // key class name
1930
1931        className.readFields(in);
1932        valClassName = className.toStringChecked(); // val class name
1933      } else {
1934        keyClassName = Text.readString(in);
1935        valClassName = Text.readString(in);
1936      }
1937
1938      if (version > 2) {                          // if version > 2
1939        this.decompress = in.readBoolean();       // is compressed?
1940      } else {
1941        decompress = false;
1942      }
1943
1944      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1945        this.blockCompressed = in.readBoolean();  // is block-compressed?
1946      } else {
1947        blockCompressed = false;
1948      }
1949      
1950      // if version >= 5
1951      // setup the compression codec
1952      if (decompress) {
1953        if (version >= CUSTOM_COMPRESS_VERSION) {
1954          String codecClassname = Text.readString(in);
1955          try {
1956            Class<? extends CompressionCodec> codecClass
1957              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1958            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1959          } catch (ClassNotFoundException cnfe) {
1960            throw new IllegalArgumentException("Unknown codec: " + 
1961                                               codecClassname, cnfe);
1962          }
1963        } else {
1964          codec = new DefaultCodec();
1965          ((Configurable)codec).setConf(conf);
1966        }
1967      }
1968      
1969      this.metadata = new Metadata();
1970      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1971        this.metadata.readFields(in);
1972      }
1973      
1974      if (version > 1) {                          // if version > 1
1975        in.readFully(sync);                       // read sync bytes
1976        headerEnd = in.getPos();                  // record end of header
1977      }
1978      
1979      // Initialize... *not* if this we are constructing a temporary Reader
1980      if (!tempReader) {
1981        valBuffer = new DataInputBuffer();
1982        if (decompress) {
1983          valDecompressor = CodecPool.getDecompressor(codec);
1984          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1985          valIn = new DataInputStream(valInFilter);
1986        } else {
1987          valIn = valBuffer;
1988        }
1989
1990        if (blockCompressed) {
1991          keyLenBuffer = new DataInputBuffer();
1992          keyBuffer = new DataInputBuffer();
1993          valLenBuffer = new DataInputBuffer();
1994
1995          keyLenDecompressor = CodecPool.getDecompressor(codec);
1996          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1997                                                   keyLenDecompressor);
1998          keyLenIn = new DataInputStream(keyLenInFilter);
1999
2000          keyDecompressor = CodecPool.getDecompressor(codec);
2001          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2002          keyIn = new DataInputStream(keyInFilter);
2003
2004          valLenDecompressor = CodecPool.getDecompressor(codec);
2005          valLenInFilter = codec.createInputStream(valLenBuffer, 
2006                                                   valLenDecompressor);
2007          valLenIn = new DataInputStream(valLenInFilter);
2008        }
2009        
2010        SerializationFactory serializationFactory =
2011          new SerializationFactory(conf);
2012        this.keyDeserializer =
2013          getDeserializer(serializationFactory, getKeyClass());
2014        if (this.keyDeserializer == null) {
2015          throw new IOException(
2016              "Could not find a deserializer for the Key class: '"
2017                  + getKeyClass().getCanonicalName() + "'. "
2018                  + "Please ensure that the configuration '" +
2019                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2020                  + "properly configured, if you're using "
2021                  + "custom serialization.");
2022        }
2023        if (!blockCompressed) {
2024          this.keyDeserializer.open(valBuffer);
2025        } else {
2026          this.keyDeserializer.open(keyIn);
2027        }
2028        this.valDeserializer =
2029          getDeserializer(serializationFactory, getValueClass());
2030        if (this.valDeserializer == null) {
2031          throw new IOException(
2032              "Could not find a deserializer for the Value class: '"
2033                  + getValueClass().getCanonicalName() + "'. "
2034                  + "Please ensure that the configuration '" +
2035                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2036                  + "properly configured, if you're using "
2037                  + "custom serialization.");
2038        }
2039        this.valDeserializer.open(valIn);
2040      }
2041    }
2042    
2043    @SuppressWarnings("unchecked")
2044    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2045      return sf.getDeserializer(c);
2046    }
2047    
2048    /** Close the file. */
2049    @Override
2050    public synchronized void close() throws IOException {
2051      // Return the decompressors to the pool
2052      CodecPool.returnDecompressor(keyLenDecompressor);
2053      CodecPool.returnDecompressor(keyDecompressor);
2054      CodecPool.returnDecompressor(valLenDecompressor);
2055      CodecPool.returnDecompressor(valDecompressor);
2056      keyLenDecompressor = keyDecompressor = null;
2057      valLenDecompressor = valDecompressor = null;
2058      
2059      if (keyDeserializer != null) {
2060        keyDeserializer.close();
2061      }
2062      if (valDeserializer != null) {
2063        valDeserializer.close();
2064      }
2065      
2066      // Close the input-stream
2067      in.close();
2068    }
2069
2070    /** Returns the name of the key class. */
2071    public String getKeyClassName() {
2072      return keyClassName;
2073    }
2074
2075    /** Returns the class of keys in this file. */
2076    public synchronized Class<?> getKeyClass() {
2077      if (null == keyClass) {
2078        try {
2079          keyClass = WritableName.getClass(getKeyClassName(), conf);
2080        } catch (IOException e) {
2081          throw new RuntimeException(e);
2082        }
2083      }
2084      return keyClass;
2085    }
2086
2087    /** Returns the name of the value class. */
2088    public String getValueClassName() {
2089      return valClassName;
2090    }
2091
2092    /** Returns the class of values in this file. */
2093    public synchronized Class<?> getValueClass() {
2094      if (null == valClass) {
2095        try {
2096          valClass = WritableName.getClass(getValueClassName(), conf);
2097        } catch (IOException e) {
2098          throw new RuntimeException(e);
2099        }
2100      }
2101      return valClass;
2102    }
2103
2104    /** Returns true if values are compressed. */
2105    public boolean isCompressed() { return decompress; }
2106    
2107    /** Returns true if records are block-compressed. */
2108    public boolean isBlockCompressed() { return blockCompressed; }
2109    
2110    /** Returns the compression codec of data in this file. */
2111    public CompressionCodec getCompressionCodec() { return codec; }
2112    
2113    private byte[] getSync() {
2114      return sync;
2115    }
2116
2117    private byte getVersion() {
2118      return version;
2119    }
2120
2121    /**
2122     * Get the compression type for this file.
2123     * @return the compression type
2124     */
2125    public CompressionType getCompressionType() {
2126      if (decompress) {
2127        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2128      } else {
2129        return CompressionType.NONE;
2130      }
2131    }
2132
2133    /** Returns the metadata object of the file */
2134    public Metadata getMetadata() {
2135      return this.metadata;
2136    }
2137    
2138    /** Returns the configuration used for this file. */
2139    Configuration getConf() { return conf; }
2140    
2141    /** Read a compressed buffer */
2142    private synchronized void readBuffer(DataInputBuffer buffer, 
2143                                         CompressionInputStream filter) throws IOException {
2144      // Read data into a temporary buffer
2145      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2146
2147      try {
2148        int dataBufferLength = WritableUtils.readVInt(in);
2149        dataBuffer.write(in, dataBufferLength);
2150      
2151        // Set up 'buffer' connected to the input-stream
2152        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2153      } finally {
2154        dataBuffer.close();
2155      }
2156
2157      // Reset the codec
2158      filter.resetState();
2159    }
2160    
2161    /** Read the next 'compressed' block */
2162    private synchronized void readBlock() throws IOException {
2163      // Check if we need to throw away a whole block of 
2164      // 'values' due to 'lazy decompression' 
2165      if (lazyDecompress && !valuesDecompressed) {
2166        in.seek(WritableUtils.readVInt(in)+in.getPos());
2167        in.seek(WritableUtils.readVInt(in)+in.getPos());
2168      }
2169      
2170      // Reset internal states
2171      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2172      valuesDecompressed = false;
2173
2174      //Process sync
2175      if (sync != null) {
2176        in.readInt();
2177        in.readFully(syncCheck);                // read syncCheck
2178        if (!Arrays.equals(sync, syncCheck))    // check it
2179          throw new IOException("File is corrupt!");
2180      }
2181      syncSeen = true;
2182
2183      // Read number of records in this block
2184      noBufferedRecords = WritableUtils.readVInt(in);
2185      
2186      // Read key lengths and keys
2187      readBuffer(keyLenBuffer, keyLenInFilter);
2188      readBuffer(keyBuffer, keyInFilter);
2189      noBufferedKeys = noBufferedRecords;
2190      
2191      // Read value lengths and values
2192      if (!lazyDecompress) {
2193        readBuffer(valLenBuffer, valLenInFilter);
2194        readBuffer(valBuffer, valInFilter);
2195        noBufferedValues = noBufferedRecords;
2196        valuesDecompressed = true;
2197      }
2198    }
2199
2200    /** 
2201     * Position valLenIn/valIn to the 'value' 
2202     * corresponding to the 'current' key 
2203     */
2204    private synchronized void seekToCurrentValue() throws IOException {
2205      if (!blockCompressed) {
2206        if (decompress) {
2207          valInFilter.resetState();
2208        }
2209        valBuffer.reset();
2210      } else {
2211        // Check if this is the first value in the 'block' to be read
2212        if (lazyDecompress && !valuesDecompressed) {
2213          // Read the value lengths and values
2214          readBuffer(valLenBuffer, valLenInFilter);
2215          readBuffer(valBuffer, valInFilter);
2216          noBufferedValues = noBufferedRecords;
2217          valuesDecompressed = true;
2218        }
2219        
2220        // Calculate the no. of bytes to skip
2221        // Note: 'current' key has already been read!
2222        int skipValBytes = 0;
2223        int currentKey = noBufferedKeys + 1;          
2224        for (int i=noBufferedValues; i > currentKey; --i) {
2225          skipValBytes += WritableUtils.readVInt(valLenIn);
2226          --noBufferedValues;
2227        }
2228        
2229        // Skip to the 'val' corresponding to 'current' key
2230        if (skipValBytes > 0) {
2231          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2232            throw new IOException("Failed to seek to " + currentKey + 
2233                                  "(th) value!");
2234          }
2235        }
2236      }
2237    }
2238
2239    /**
2240     * Get the 'value' corresponding to the last read 'key'.
2241     * @param val : The 'value' to be read.
2242     * @throws IOException
2243     */
2244    public synchronized void getCurrentValue(Writable val) 
2245      throws IOException {
2246      if (val instanceof Configurable) {
2247        ((Configurable) val).setConf(this.conf);
2248      }
2249
2250      // Position stream to 'current' value
2251      seekToCurrentValue();
2252
2253      if (!blockCompressed) {
2254        val.readFields(valIn);
2255        
2256        if (valIn.read() > 0) {
2257          LOG.info("available bytes: " + valIn.available());
2258          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2259                                + " bytes, should read " +
2260                                (valBuffer.getLength()-keyLength));
2261        }
2262      } else {
2263        // Get the value
2264        int valLength = WritableUtils.readVInt(valLenIn);
2265        val.readFields(valIn);
2266        
2267        // Read another compressed 'value'
2268        --noBufferedValues;
2269        
2270        // Sanity check
2271        if ((valLength < 0) && LOG.isDebugEnabled()) {
2272          LOG.debug(val + " is a zero-length value");
2273        }
2274      }
2275
2276    }
2277    
2278    /**
2279     * Get the 'value' corresponding to the last read 'key'.
2280     * @param val : The 'value' to be read.
2281     * @throws IOException
2282     */
2283    public synchronized Object getCurrentValue(Object val) 
2284      throws IOException {
2285      if (val instanceof Configurable) {
2286        ((Configurable) val).setConf(this.conf);
2287      }
2288
2289      // Position stream to 'current' value
2290      seekToCurrentValue();
2291
2292      if (!blockCompressed) {
2293        val = deserializeValue(val);
2294        
2295        if (valIn.read() > 0) {
2296          LOG.info("available bytes: " + valIn.available());
2297          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2298                                + " bytes, should read " +
2299                                (valBuffer.getLength()-keyLength));
2300        }
2301      } else {
2302        // Get the value
2303        int valLength = WritableUtils.readVInt(valLenIn);
2304        val = deserializeValue(val);
2305        
2306        // Read another compressed 'value'
2307        --noBufferedValues;
2308        
2309        // Sanity check
2310        if ((valLength < 0) && LOG.isDebugEnabled()) {
2311          LOG.debug(val + " is a zero-length value");
2312        }
2313      }
2314      return val;
2315
2316    }
2317
2318    @SuppressWarnings("unchecked")
2319    private Object deserializeValue(Object val) throws IOException {
2320      return valDeserializer.deserialize(val);
2321    }
2322    
2323    /** Read the next key in the file into <code>key</code>, skipping its
2324     * value.  True if another entry exists, and false at end of file. */
2325    public synchronized boolean next(Writable key) throws IOException {
2326      if (key.getClass() != getKeyClass())
2327        throw new IOException("wrong key class: "+key.getClass().getName()
2328                              +" is not "+keyClass);
2329
2330      if (!blockCompressed) {
2331        outBuf.reset();
2332        
2333        keyLength = next(outBuf);
2334        if (keyLength < 0)
2335          return false;
2336        
2337        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2338        
2339        key.readFields(valBuffer);
2340        valBuffer.mark(0);
2341        if (valBuffer.getPosition() != keyLength)
2342          throw new IOException(key + " read " + valBuffer.getPosition()
2343                                + " bytes, should read " + keyLength);
2344      } else {
2345        //Reset syncSeen
2346        syncSeen = false;
2347        
2348        if (noBufferedKeys == 0) {
2349          try {
2350            readBlock();
2351          } catch (EOFException eof) {
2352            return false;
2353          }
2354        }
2355        
2356        int keyLength = WritableUtils.readVInt(keyLenIn);
2357        
2358        // Sanity check
2359        if (keyLength < 0) {
2360          return false;
2361        }
2362        
2363        //Read another compressed 'key'
2364        key.readFields(keyIn);
2365        --noBufferedKeys;
2366      }
2367
2368      return true;
2369    }
2370
2371    /** Read the next key/value pair in the file into <code>key</code> and
2372     * <code>val</code>.  Returns true if such a pair exists and false when at
2373     * end of file */
2374    public synchronized boolean next(Writable key, Writable val)
2375      throws IOException {
2376      if (val.getClass() != getValueClass())
2377        throw new IOException("wrong value class: "+val+" is not "+valClass);
2378
2379      boolean more = next(key);
2380      
2381      if (more) {
2382        getCurrentValue(val);
2383      }
2384
2385      return more;
2386    }
2387    
2388    /**
2389     * Read and return the next record length, potentially skipping over 
2390     * a sync block.
2391     * @return the length of the next record or -1 if there is no next record
2392     * @throws IOException
2393     */
2394    private synchronized int readRecordLength() throws IOException {
2395      if (in.getPos() >= end) {
2396        return -1;
2397      }      
2398      int length = in.readInt();
2399      if (version > 1 && sync != null &&
2400          length == SYNC_ESCAPE) {              // process a sync entry
2401        in.readFully(syncCheck);                // read syncCheck
2402        if (!Arrays.equals(sync, syncCheck))    // check it
2403          throw new IOException("File is corrupt!");
2404        syncSeen = true;
2405        if (in.getPos() >= end) {
2406          return -1;
2407        }
2408        length = in.readInt();                  // re-read length
2409      } else {
2410        syncSeen = false;
2411      }
2412      
2413      return length;
2414    }
2415    
2416    /** Read the next key/value pair in the file into <code>buffer</code>.
2417     * Returns the length of the key read, or -1 if at end of file.  The length
2418     * of the value may be computed by calling buffer.getLength() before and
2419     * after calls to this method. */
2420    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2421    @Deprecated
2422    synchronized int next(DataOutputBuffer buffer) throws IOException {
2423      // Unsupported for block-compressed sequence files
2424      if (blockCompressed) {
2425        throw new IOException("Unsupported call for block-compressed" +
2426                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2427      }
2428      try {
2429        int length = readRecordLength();
2430        if (length == -1) {
2431          return -1;
2432        }
2433        int keyLength = in.readInt();
2434        buffer.write(in, length);
2435        return keyLength;
2436      } catch (ChecksumException e) {             // checksum failure
2437        handleChecksumException(e);
2438        return next(buffer);
2439      }
2440    }
2441
2442    public ValueBytes createValueBytes() {
2443      ValueBytes val = null;
2444      if (!decompress || blockCompressed) {
2445        val = new UncompressedBytes();
2446      } else {
2447        val = new CompressedBytes(codec);
2448      }
2449      return val;
2450    }
2451
2452    /**
2453     * Read 'raw' records.
2454     * @param key - The buffer into which the key is read
2455     * @param val - The 'raw' value
2456     * @return Returns the total record length or -1 for end of file
2457     * @throws IOException
2458     */
2459    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2460      throws IOException {
2461      if (!blockCompressed) {
2462        int length = readRecordLength();
2463        if (length == -1) {
2464          return -1;
2465        }
2466        int keyLength = in.readInt();
2467        int valLength = length - keyLength;
2468        key.write(in, keyLength);
2469        if (decompress) {
2470          CompressedBytes value = (CompressedBytes)val;
2471          value.reset(in, valLength);
2472        } else {
2473          UncompressedBytes value = (UncompressedBytes)val;
2474          value.reset(in, valLength);
2475        }
2476        
2477        return length;
2478      } else {
2479        //Reset syncSeen
2480        syncSeen = false;
2481        
2482        // Read 'key'
2483        if (noBufferedKeys == 0) {
2484          if (in.getPos() >= end) 
2485            return -1;
2486
2487          try { 
2488            readBlock();
2489          } catch (EOFException eof) {
2490            return -1;
2491          }
2492        }
2493        int keyLength = WritableUtils.readVInt(keyLenIn);
2494        if (keyLength < 0) {
2495          throw new IOException("zero length key found!");
2496        }
2497        key.write(keyIn, keyLength);
2498        --noBufferedKeys;
2499        
2500        // Read raw 'value'
2501        seekToCurrentValue();
2502        int valLength = WritableUtils.readVInt(valLenIn);
2503        UncompressedBytes rawValue = (UncompressedBytes)val;
2504        rawValue.reset(valIn, valLength);
2505        --noBufferedValues;
2506        
2507        return (keyLength+valLength);
2508      }
2509      
2510    }
2511
2512    /**
2513     * Read 'raw' keys.
2514     * @param key - The buffer into which the key is read
2515     * @return Returns the key length or -1 for end of file
2516     * @throws IOException
2517     */
2518    public synchronized int nextRawKey(DataOutputBuffer key) 
2519      throws IOException {
2520      if (!blockCompressed) {
2521        recordLength = readRecordLength();
2522        if (recordLength == -1) {
2523          return -1;
2524        }
2525        keyLength = in.readInt();
2526        key.write(in, keyLength);
2527        return keyLength;
2528      } else {
2529        //Reset syncSeen
2530        syncSeen = false;
2531        
2532        // Read 'key'
2533        if (noBufferedKeys == 0) {
2534          if (in.getPos() >= end) 
2535            return -1;
2536
2537          try { 
2538            readBlock();
2539          } catch (EOFException eof) {
2540            return -1;
2541          }
2542        }
2543        int keyLength = WritableUtils.readVInt(keyLenIn);
2544        if (keyLength < 0) {
2545          throw new IOException("zero length key found!");
2546        }
2547        key.write(keyIn, keyLength);
2548        --noBufferedKeys;
2549        
2550        return keyLength;
2551      }
2552      
2553    }
2554
2555    /** Read the next key in the file, skipping its
2556     * value.  Return null at end of file. */
2557    public synchronized Object next(Object key) throws IOException {
2558      if (key != null && key.getClass() != getKeyClass()) {
2559        throw new IOException("wrong key class: "+key.getClass().getName()
2560                              +" is not "+keyClass);
2561      }
2562
2563      if (!blockCompressed) {
2564        outBuf.reset();
2565        
2566        keyLength = next(outBuf);
2567        if (keyLength < 0)
2568          return null;
2569        
2570        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2571        
2572        key = deserializeKey(key);
2573        valBuffer.mark(0);
2574        if (valBuffer.getPosition() != keyLength)
2575          throw new IOException(key + " read " + valBuffer.getPosition()
2576                                + " bytes, should read " + keyLength);
2577      } else {
2578        //Reset syncSeen
2579        syncSeen = false;
2580        
2581        if (noBufferedKeys == 0) {
2582          try {
2583            readBlock();
2584          } catch (EOFException eof) {
2585            return null;
2586          }
2587        }
2588        
2589        int keyLength = WritableUtils.readVInt(keyLenIn);
2590        
2591        // Sanity check
2592        if (keyLength < 0) {
2593          return null;
2594        }
2595        
2596        //Read another compressed 'key'
2597        key = deserializeKey(key);
2598        --noBufferedKeys;
2599      }
2600
2601      return key;
2602    }
2603
2604    @SuppressWarnings("unchecked")
2605    private Object deserializeKey(Object key) throws IOException {
2606      return keyDeserializer.deserialize(key);
2607    }
2608
2609    /**
2610     * Read 'raw' values.
2611     * @param val - The 'raw' value
2612     * @return Returns the value length
2613     * @throws IOException
2614     */
2615    public synchronized int nextRawValue(ValueBytes val) 
2616      throws IOException {
2617      
2618      // Position stream to current value
2619      seekToCurrentValue();
2620 
2621      if (!blockCompressed) {
2622        int valLength = recordLength - keyLength;
2623        if (decompress) {
2624          CompressedBytes value = (CompressedBytes)val;
2625          value.reset(in, valLength);
2626        } else {
2627          UncompressedBytes value = (UncompressedBytes)val;
2628          value.reset(in, valLength);
2629        }
2630         
2631        return valLength;
2632      } else {
2633        int valLength = WritableUtils.readVInt(valLenIn);
2634        UncompressedBytes rawValue = (UncompressedBytes)val;
2635        rawValue.reset(valIn, valLength);
2636        --noBufferedValues;
2637        return valLength;
2638      }
2639      
2640    }
2641
2642    private void handleChecksumException(ChecksumException e)
2643      throws IOException {
2644      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2645        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2646        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2647      } else {
2648        throw e;
2649      }
2650    }
2651
2652    /** disables sync. often invoked for tmp files */
2653    synchronized void ignoreSync() {
2654      sync = null;
2655    }
2656    
2657    /** Set the current byte position in the input file.
2658     *
2659     * <p>The position passed must be a position returned by {@link
2660     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2661     * position, use {@link SequenceFile.Reader#sync(long)}.
2662     */
2663    public synchronized void seek(long position) throws IOException {
2664      in.seek(position);
2665      if (blockCompressed) {                      // trigger block read
2666        noBufferedKeys = 0;
2667        valuesDecompressed = true;
2668      }
2669    }
2670
2671    /** Seek to the next sync mark past a given position.*/
2672    public synchronized void sync(long position) throws IOException {
2673      if (position+SYNC_SIZE >= end) {
2674        seek(end);
2675        return;
2676      }
2677
2678      if (position < headerEnd) {
2679        // seek directly to first record
2680        in.seek(headerEnd);
2681        // note the sync marker "seen" in the header
2682        syncSeen = true;
2683        return;
2684      }
2685
2686      try {
2687        seek(position+4);                         // skip escape
2688        in.readFully(syncCheck);
2689        int syncLen = sync.length;
2690        for (int i = 0; in.getPos() < end; i++) {
2691          int j = 0;
2692          for (; j < syncLen; j++) {
2693            if (sync[j] != syncCheck[(i+j)%syncLen])
2694              break;
2695          }
2696          if (j == syncLen) {
2697            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2698            return;
2699          }
2700          syncCheck[i%syncLen] = in.readByte();
2701        }
2702      } catch (ChecksumException e) {             // checksum failure
2703        handleChecksumException(e);
2704      }
2705    }
2706
2707    /** Returns true iff the previous call to next passed a sync mark.*/
2708    public synchronized boolean syncSeen() { return syncSeen; }
2709
2710    /** Return the current byte position in the input file. */
2711    public synchronized long getPosition() throws IOException {
2712      return in.getPos();
2713    }
2714
2715    /** Returns the name of the file. */
2716    @Override
2717    public String toString() {
2718      return filename;
2719    }
2720
2721  }
2722
2723  /** Sorts key/value pairs in a sequence-format file.
2724   *
2725   * <p>For best performance, applications should make sure that the {@link
2726   * Writable#readFields(DataInput)} implementation of their keys is
2727   * very efficient.  In particular, it should avoid allocating memory.
2728   */
2729  public static class Sorter {
2730
2731    private RawComparator comparator;
2732
2733    private MergeSort mergeSort; //the implementation of merge sort
2734    
2735    private Path[] inFiles;                     // when merging or sorting
2736
2737    private Path outFile;
2738
2739    private int memory; // bytes
2740    private int factor; // merged per pass
2741
2742    private FileSystem fs = null;
2743
2744    private Class keyClass;
2745    private Class valClass;
2746
2747    private Configuration conf;
2748    private Metadata metadata;
2749    
2750    private Progressable progressable = null;
2751
2752    /** Sort and merge files containing the named classes. */
2753    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2754                  Class valClass, Configuration conf)  {
2755      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2756    }
2757
2758    /** Sort and merge using an arbitrary {@link RawComparator}. */
2759    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2760                  Class valClass, Configuration conf) {
2761      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2762    }
2763
2764    /** Sort and merge using an arbitrary {@link RawComparator}. */
2765    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2766                  Class valClass, Configuration conf, Metadata metadata) {
2767      this.fs = fs;
2768      this.comparator = comparator;
2769      this.keyClass = keyClass;
2770      this.valClass = valClass;
2771      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2772      this.factor = conf.getInt("io.sort.factor", 100);
2773      this.conf = conf;
2774      this.metadata = metadata;
2775    }
2776
2777    /** Set the number of streams to merge at once.*/
2778    public void setFactor(int factor) { this.factor = factor; }
2779
2780    /** Get the number of streams to merge at once.*/
2781    public int getFactor() { return factor; }
2782
2783    /** Set the total amount of buffer memory, in bytes.*/
2784    public void setMemory(int memory) { this.memory = memory; }
2785
2786    /** Get the total amount of buffer memory, in bytes.*/
2787    public int getMemory() { return memory; }
2788
2789    /** Set the progressable object in order to report progress. */
2790    public void setProgressable(Progressable progressable) {
2791      this.progressable = progressable;
2792    }
2793    
2794    /** 
2795     * Perform a file sort from a set of input files into an output file.
2796     * @param inFiles the files to be sorted
2797     * @param outFile the sorted output file
2798     * @param deleteInput should the input files be deleted as they are read?
2799     */
2800    public void sort(Path[] inFiles, Path outFile,
2801                     boolean deleteInput) throws IOException {
2802      if (fs.exists(outFile)) {
2803        throw new IOException("already exists: " + outFile);
2804      }
2805
2806      this.inFiles = inFiles;
2807      this.outFile = outFile;
2808
2809      int segments = sortPass(deleteInput);
2810      if (segments > 1) {
2811        mergePass(outFile.getParent());
2812      }
2813    }
2814
2815    /** 
2816     * Perform a file sort from a set of input files and return an iterator.
2817     * @param inFiles the files to be sorted
2818     * @param tempDir the directory where temp files are created during sort
2819     * @param deleteInput should the input files be deleted as they are read?
2820     * @return iterator the RawKeyValueIterator
2821     */
2822    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2823                                              boolean deleteInput) throws IOException {
2824      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2825      if (fs.exists(outFile)) {
2826        throw new IOException("already exists: " + outFile);
2827      }
2828      this.inFiles = inFiles;
2829      //outFile will basically be used as prefix for temp files in the cases
2830      //where sort outputs multiple sorted segments. For the single segment
2831      //case, the outputFile itself will contain the sorted data for that
2832      //segment
2833      this.outFile = outFile;
2834
2835      int segments = sortPass(deleteInput);
2836      if (segments > 1)
2837        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2838                     tempDir);
2839      else if (segments == 1)
2840        return merge(new Path[]{outFile}, true, tempDir);
2841      else return null;
2842    }
2843
2844    /**
2845     * The backwards compatible interface to sort.
2846     * @param inFile the input file to sort
2847     * @param outFile the sorted output file
2848     */
2849    public void sort(Path inFile, Path outFile) throws IOException {
2850      sort(new Path[]{inFile}, outFile, false);
2851    }
2852    
2853    private int sortPass(boolean deleteInput) throws IOException {
2854      if(LOG.isDebugEnabled()) {
2855        LOG.debug("running sort pass");
2856      }
2857      SortPass sortPass = new SortPass();         // make the SortPass
2858      sortPass.setProgressable(progressable);
2859      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2860      try {
2861        return sortPass.run(deleteInput);         // run it
2862      } finally {
2863        sortPass.close();                         // close it
2864      }
2865    }
2866
2867    private class SortPass {
2868      private int memoryLimit = memory/4;
2869      private int recordLimit = 1000000;
2870      
2871      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2872      private byte[] rawBuffer;
2873
2874      private int[] keyOffsets = new int[1024];
2875      private int[] pointers = new int[keyOffsets.length];
2876      private int[] pointersCopy = new int[keyOffsets.length];
2877      private int[] keyLengths = new int[keyOffsets.length];
2878      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2879      
2880      private ArrayList segmentLengths = new ArrayList();
2881      
2882      private Reader in = null;
2883      private FSDataOutputStream out = null;
2884      private FSDataOutputStream indexOut = null;
2885      private Path outName;
2886
2887      private Progressable progressable = null;
2888
2889      public int run(boolean deleteInput) throws IOException {
2890        int segments = 0;
2891        int currentFile = 0;
2892        boolean atEof = (currentFile >= inFiles.length);
2893        CompressionType compressionType;
2894        CompressionCodec codec = null;
2895        segmentLengths.clear();
2896        if (atEof) {
2897          return 0;
2898        }
2899        
2900        // Initialize
2901        in = new Reader(fs, inFiles[currentFile], conf);
2902        compressionType = in.getCompressionType();
2903        codec = in.getCompressionCodec();
2904        
2905        for (int i=0; i < rawValues.length; ++i) {
2906          rawValues[i] = null;
2907        }
2908        
2909        while (!atEof) {
2910          int count = 0;
2911          int bytesProcessed = 0;
2912          rawKeys.reset();
2913          while (!atEof && 
2914                 bytesProcessed < memoryLimit && count < recordLimit) {
2915
2916            // Read a record into buffer
2917            // Note: Attempt to re-use 'rawValue' as far as possible
2918            int keyOffset = rawKeys.getLength();       
2919            ValueBytes rawValue = 
2920              (count == keyOffsets.length || rawValues[count] == null) ? 
2921              in.createValueBytes() : 
2922              rawValues[count];
2923            int recordLength = in.nextRaw(rawKeys, rawValue);
2924            if (recordLength == -1) {
2925              in.close();
2926              if (deleteInput) {
2927                fs.delete(inFiles[currentFile], true);
2928              }
2929              currentFile += 1;
2930              atEof = currentFile >= inFiles.length;
2931              if (!atEof) {
2932                in = new Reader(fs, inFiles[currentFile], conf);
2933              } else {
2934                in = null;
2935              }
2936              continue;
2937            }
2938
2939            int keyLength = rawKeys.getLength() - keyOffset;
2940
2941            if (count == keyOffsets.length)
2942              grow();
2943
2944            keyOffsets[count] = keyOffset;                // update pointers
2945            pointers[count] = count;
2946            keyLengths[count] = keyLength;
2947            rawValues[count] = rawValue;
2948
2949            bytesProcessed += recordLength; 
2950            count++;
2951          }
2952
2953          // buffer is full -- sort & flush it
2954          if(LOG.isDebugEnabled()) {
2955            LOG.debug("flushing segment " + segments);
2956          }
2957          rawBuffer = rawKeys.getData();
2958          sort(count);
2959          // indicate we're making progress
2960          if (progressable != null) {
2961            progressable.progress();
2962          }
2963          flush(count, bytesProcessed, compressionType, codec, 
2964                segments==0 && atEof);
2965          segments++;
2966        }
2967        return segments;
2968      }
2969
2970      public void close() throws IOException {
2971        if (in != null) {
2972          in.close();
2973        }
2974        if (out != null) {
2975          out.close();
2976        }
2977        if (indexOut != null) {
2978          indexOut.close();
2979        }
2980      }
2981
2982      private void grow() {
2983        int newLength = keyOffsets.length * 3 / 2;
2984        keyOffsets = grow(keyOffsets, newLength);
2985        pointers = grow(pointers, newLength);
2986        pointersCopy = new int[newLength];
2987        keyLengths = grow(keyLengths, newLength);
2988        rawValues = grow(rawValues, newLength);
2989      }
2990
2991      private int[] grow(int[] old, int newLength) {
2992        int[] result = new int[newLength];
2993        System.arraycopy(old, 0, result, 0, old.length);
2994        return result;
2995      }
2996      
2997      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2998        ValueBytes[] result = new ValueBytes[newLength];
2999        System.arraycopy(old, 0, result, 0, old.length);
3000        for (int i=old.length; i < newLength; ++i) {
3001          result[i] = null;
3002        }
3003        return result;
3004      }
3005
3006      private void flush(int count, int bytesProcessed, 
3007                         CompressionType compressionType, 
3008                         CompressionCodec codec, 
3009                         boolean done) throws IOException {
3010        if (out == null) {
3011          outName = done ? outFile : outFile.suffix(".0");
3012          out = fs.create(outName);
3013          if (!done) {
3014            indexOut = fs.create(outName.suffix(".index"));
3015          }
3016        }
3017
3018        long segmentStart = out.getPos();
3019        Writer writer = createWriter(conf, Writer.stream(out), 
3020            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3021            Writer.compression(compressionType, codec),
3022            Writer.metadata(done ? metadata : new Metadata()));
3023        
3024        if (!done) {
3025          writer.sync = null;                     // disable sync on temp files
3026        }
3027
3028        for (int i = 0; i < count; i++) {         // write in sorted order
3029          int p = pointers[i];
3030          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3031        }
3032        writer.close();
3033        
3034        if (!done) {
3035          // Save the segment length
3036          WritableUtils.writeVLong(indexOut, segmentStart);
3037          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3038          indexOut.flush();
3039        }
3040      }
3041
3042      private void sort(int count) {
3043        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3044        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3045      }
3046      class SeqFileComparator implements Comparator<IntWritable> {
3047        @Override
3048        public int compare(IntWritable I, IntWritable J) {
3049          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3050                                    keyLengths[I.get()], rawBuffer, 
3051                                    keyOffsets[J.get()], keyLengths[J.get()]);
3052        }
3053      }
3054      
3055      /** set the progressable object in order to report progress */
3056      public void setProgressable(Progressable progressable)
3057      {
3058        this.progressable = progressable;
3059      }
3060      
3061    } // SequenceFile.Sorter.SortPass
3062
3063    /** The interface to iterate over raw keys/values of SequenceFiles. */
3064    public static interface RawKeyValueIterator {
3065      /** Gets the current raw key
3066       * @return DataOutputBuffer
3067       * @throws IOException
3068       */
3069      DataOutputBuffer getKey() throws IOException; 
3070      /** Gets the current raw value
3071       * @return ValueBytes 
3072       * @throws IOException
3073       */
3074      ValueBytes getValue() throws IOException; 
3075      /** Sets up the current key and value (for getKey and getValue)
3076       * @return true if there exists a key/value, false otherwise 
3077       * @throws IOException
3078       */
3079      boolean next() throws IOException;
3080      /** closes the iterator so that the underlying streams can be closed
3081       * @throws IOException
3082       */
3083      void close() throws IOException;
3084      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3085       * indicating the bytes processed by the iterator so far
3086       */
3087      Progress getProgress();
3088    }    
3089    
3090    /**
3091     * Merges the list of segments of type <code>SegmentDescriptor</code>
3092     * @param segments the list of SegmentDescriptors
3093     * @param tmpDir the directory to write temporary files into
3094     * @return RawKeyValueIterator
3095     * @throws IOException
3096     */
3097    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3098                                     Path tmpDir) 
3099      throws IOException {
3100      // pass in object to report progress, if present
3101      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3102      return mQueue.merge();
3103    }
3104
3105    /**
3106     * Merges the contents of files passed in Path[] using a max factor value
3107     * that is already set
3108     * @param inNames the array of path names
3109     * @param deleteInputs true if the input files should be deleted when 
3110     * unnecessary
3111     * @param tmpDir the directory to write temporary files into
3112     * @return RawKeyValueIteratorMergeQueue
3113     * @throws IOException
3114     */
3115    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3116                                     Path tmpDir) 
3117      throws IOException {
3118      return merge(inNames, deleteInputs, 
3119                   (inNames.length < factor) ? inNames.length : factor,
3120                   tmpDir);
3121    }
3122
3123    /**
3124     * Merges the contents of files passed in Path[]
3125     * @param inNames the array of path names
3126     * @param deleteInputs true if the input files should be deleted when 
3127     * unnecessary
3128     * @param factor the factor that will be used as the maximum merge fan-in
3129     * @param tmpDir the directory to write temporary files into
3130     * @return RawKeyValueIteratorMergeQueue
3131     * @throws IOException
3132     */
3133    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3134                                     int factor, Path tmpDir) 
3135      throws IOException {
3136      //get the segments from inNames
3137      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3138      for (int i = 0; i < inNames.length; i++) {
3139        SegmentDescriptor s = new SegmentDescriptor(0,
3140            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3141        s.preserveInput(!deleteInputs);
3142        s.doSync();
3143        a.add(s);
3144      }
3145      this.factor = factor;
3146      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3147      return mQueue.merge();
3148    }
3149
3150    /**
3151     * Merges the contents of files passed in Path[]
3152     * @param inNames the array of path names
3153     * @param tempDir the directory for creating temp files during merge
3154     * @param deleteInputs true if the input files should be deleted when 
3155     * unnecessary
3156     * @return RawKeyValueIteratorMergeQueue
3157     * @throws IOException
3158     */
3159    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3160                                     boolean deleteInputs) 
3161      throws IOException {
3162      //outFile will basically be used as prefix for temp files for the
3163      //intermediate merge outputs           
3164      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3165      //get the segments from inNames
3166      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3167      for (int i = 0; i < inNames.length; i++) {
3168        SegmentDescriptor s = new SegmentDescriptor(0,
3169            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3170        s.preserveInput(!deleteInputs);
3171        s.doSync();
3172        a.add(s);
3173      }
3174      factor = (inNames.length < factor) ? inNames.length : factor;
3175      // pass in object to report progress, if present
3176      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3177      return mQueue.merge();
3178    }
3179
3180    /**
3181     * Clones the attributes (like compression of the input file and creates a 
3182     * corresponding Writer
3183     * @param inputFile the path of the input file whose attributes should be 
3184     * cloned
3185     * @param outputFile the path of the output file 
3186     * @param prog the Progressable to report status during the file write
3187     * @return Writer
3188     * @throws IOException
3189     */
3190    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3191                                      Progressable prog) throws IOException {
3192      Reader reader = new Reader(conf,
3193                                 Reader.file(inputFile),
3194                                 new Reader.OnlyHeaderOption());
3195      CompressionType compress = reader.getCompressionType();
3196      CompressionCodec codec = reader.getCompressionCodec();
3197      reader.close();
3198
3199      Writer writer = createWriter(conf, 
3200                                   Writer.file(outputFile), 
3201                                   Writer.keyClass(keyClass), 
3202                                   Writer.valueClass(valClass), 
3203                                   Writer.compression(compress, codec), 
3204                                   Writer.progressable(prog));
3205      return writer;
3206    }
3207
3208    /**
3209     * Writes records from RawKeyValueIterator into a file represented by the 
3210     * passed writer
3211     * @param records the RawKeyValueIterator
3212     * @param writer the Writer created earlier 
3213     * @throws IOException
3214     */
3215    public void writeFile(RawKeyValueIterator records, Writer writer) 
3216      throws IOException {
3217      while(records.next()) {
3218        writer.appendRaw(records.getKey().getData(), 0, 
3219                         records.getKey().getLength(), records.getValue());
3220      }
3221      writer.sync();
3222    }
3223        
3224    /** Merge the provided files.
3225     * @param inFiles the array of input path names
3226     * @param outFile the final output file
3227     * @throws IOException
3228     */
3229    public void merge(Path[] inFiles, Path outFile) throws IOException {
3230      if (fs.exists(outFile)) {
3231        throw new IOException("already exists: " + outFile);
3232      }
3233      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3234      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3235      
3236      writeFile(r, writer);
3237
3238      writer.close();
3239    }
3240
3241    /** sort calls this to generate the final merged output */
3242    private int mergePass(Path tmpDir) throws IOException {
3243      if(LOG.isDebugEnabled()) {
3244        LOG.debug("running merge pass");
3245      }
3246      Writer writer = cloneFileAttributes(
3247                                          outFile.suffix(".0"), outFile, null);
3248      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3249                                    outFile.suffix(".0.index"), tmpDir);
3250      writeFile(r, writer);
3251
3252      writer.close();
3253      return 0;
3254    }
3255
3256    /** Used by mergePass to merge the output of the sort
3257     * @param inName the name of the input file containing sorted segments
3258     * @param indexIn the offsets of the sorted segments
3259     * @param tmpDir the relative directory to store intermediate results in
3260     * @return RawKeyValueIterator
3261     * @throws IOException
3262     */
3263    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3264      throws IOException {
3265      //get the segments from indexIn
3266      //we create a SegmentContainer so that we can track segments belonging to
3267      //inName and delete inName as soon as we see that we have looked at all
3268      //the contained segments during the merge process & hence don't need 
3269      //them anymore
3270      SegmentContainer container = new SegmentContainer(inName, indexIn);
3271      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3272      return mQueue.merge();
3273    }
3274    
3275    /** This class implements the core of the merge logic */
3276    private class MergeQueue extends PriorityQueue 
3277      implements RawKeyValueIterator {
3278      private boolean compress;
3279      private boolean blockCompress;
3280      private DataOutputBuffer rawKey = new DataOutputBuffer();
3281      private ValueBytes rawValue;
3282      private long totalBytesProcessed;
3283      private float progPerByte;
3284      private Progress mergeProgress = new Progress();
3285      private Path tmpDir;
3286      private Progressable progress = null; //handle to the progress reporting object
3287      private SegmentDescriptor minSegment;
3288      
3289      //a TreeMap used to store the segments sorted by size (segment offset and
3290      //segment path name is used to break ties between segments of same sizes)
3291      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3292        new TreeMap<SegmentDescriptor, Void>();
3293            
3294      @SuppressWarnings("unchecked")
3295      public void put(SegmentDescriptor stream) throws IOException {
3296        if (size() == 0) {
3297          compress = stream.in.isCompressed();
3298          blockCompress = stream.in.isBlockCompressed();
3299        } else if (compress != stream.in.isCompressed() || 
3300                   blockCompress != stream.in.isBlockCompressed()) {
3301          throw new IOException("All merged files must be compressed or not.");
3302        } 
3303        super.put(stream);
3304      }
3305      
3306      /**
3307       * A queue of file segments to merge
3308       * @param segments the file segments to merge
3309       * @param tmpDir a relative local directory to save intermediate files in
3310       * @param progress the reference to the Progressable object
3311       */
3312      public MergeQueue(List <SegmentDescriptor> segments,
3313          Path tmpDir, Progressable progress) {
3314        int size = segments.size();
3315        for (int i = 0; i < size; i++) {
3316          sortedSegmentSizes.put(segments.get(i), null);
3317        }
3318        this.tmpDir = tmpDir;
3319        this.progress = progress;
3320      }
3321      @Override
3322      protected boolean lessThan(Object a, Object b) {
3323        // indicate we're making progress
3324        if (progress != null) {
3325          progress.progress();
3326        }
3327        SegmentDescriptor msa = (SegmentDescriptor)a;
3328        SegmentDescriptor msb = (SegmentDescriptor)b;
3329        return comparator.compare(msa.getKey().getData(), 0, 
3330                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3331                                  msb.getKey().getLength()) < 0;
3332      }
3333      @Override
3334      public void close() throws IOException {
3335        SegmentDescriptor ms;                           // close inputs
3336        while ((ms = (SegmentDescriptor)pop()) != null) {
3337          ms.cleanup();
3338        }
3339        minSegment = null;
3340      }
3341      @Override
3342      public DataOutputBuffer getKey() throws IOException {
3343        return rawKey;
3344      }
3345      @Override
3346      public ValueBytes getValue() throws IOException {
3347        return rawValue;
3348      }
3349      @Override
3350      public boolean next() throws IOException {
3351        if (size() == 0)
3352          return false;
3353        if (minSegment != null) {
3354          //minSegment is non-null for all invocations of next except the first
3355          //one. For the first invocation, the priority queue is ready for use
3356          //but for the subsequent invocations, first adjust the queue 
3357          adjustPriorityQueue(minSegment);
3358          if (size() == 0) {
3359            minSegment = null;
3360            return false;
3361          }
3362        }
3363        minSegment = (SegmentDescriptor)top();
3364        long startPos = minSegment.in.getPosition(); // Current position in stream
3365        //save the raw key reference
3366        rawKey = minSegment.getKey();
3367        //load the raw value. Re-use the existing rawValue buffer
3368        if (rawValue == null) {
3369          rawValue = minSegment.in.createValueBytes();
3370        }
3371        minSegment.nextRawValue(rawValue);
3372        long endPos = minSegment.in.getPosition(); // End position after reading value
3373        updateProgress(endPos - startPos);
3374        return true;
3375      }
3376      
3377      @Override
3378      public Progress getProgress() {
3379        return mergeProgress; 
3380      }
3381
3382      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3383        long startPos = ms.in.getPosition(); // Current position in stream
3384        boolean hasNext = ms.nextRawKey();
3385        long endPos = ms.in.getPosition(); // End position after reading key
3386        updateProgress(endPos - startPos);
3387        if (hasNext) {
3388          adjustTop();
3389        } else {
3390          pop();
3391          ms.cleanup();
3392        }
3393      }
3394
3395      private void updateProgress(long bytesProcessed) {
3396        totalBytesProcessed += bytesProcessed;
3397        if (progPerByte > 0) {
3398          mergeProgress.set(totalBytesProcessed * progPerByte);
3399        }
3400      }
3401      
3402      /** This is the single level merge that is called multiple times 
3403       * depending on the factor size and the number of segments
3404       * @return RawKeyValueIterator
3405       * @throws IOException
3406       */
3407      public RawKeyValueIterator merge() throws IOException {
3408        //create the MergeStreams from the sorted map created in the constructor
3409        //and dump the final output to a file
3410        int numSegments = sortedSegmentSizes.size();
3411        int origFactor = factor;
3412        int passNo = 1;
3413        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3414        do {
3415          //get the factor for this pass of merge
3416          factor = getPassFactor(passNo, numSegments);
3417          List<SegmentDescriptor> segmentsToMerge =
3418            new ArrayList<SegmentDescriptor>();
3419          int segmentsConsidered = 0;
3420          int numSegmentsToConsider = factor;
3421          while (true) {
3422            //extract the smallest 'factor' number of segment pointers from the 
3423            //TreeMap. Call cleanup on the empty segments (no key/value data)
3424            SegmentDescriptor[] mStream = 
3425              getSegmentDescriptors(numSegmentsToConsider);
3426            for (int i = 0; i < mStream.length; i++) {
3427              if (mStream[i].nextRawKey()) {
3428                segmentsToMerge.add(mStream[i]);
3429                segmentsConsidered++;
3430                // Count the fact that we read some bytes in calling nextRawKey()
3431                updateProgress(mStream[i].in.getPosition());
3432              }
3433              else {
3434                mStream[i].cleanup();
3435                numSegments--; //we ignore this segment for the merge
3436              }
3437            }
3438            //if we have the desired number of segments
3439            //or looked at all available segments, we break
3440            if (segmentsConsidered == factor || 
3441                sortedSegmentSizes.size() == 0) {
3442              break;
3443            }
3444              
3445            numSegmentsToConsider = factor - segmentsConsidered;
3446          }
3447          //feed the streams to the priority queue
3448          initialize(segmentsToMerge.size()); clear();
3449          for (int i = 0; i < segmentsToMerge.size(); i++) {
3450            put(segmentsToMerge.get(i));
3451          }
3452          //if we have lesser number of segments remaining, then just return the
3453          //iterator, else do another single level merge
3454          if (numSegments <= factor) {
3455            //calculate the length of the remaining segments. Required for 
3456            //calculating the merge progress
3457            long totalBytes = 0;
3458            for (int i = 0; i < segmentsToMerge.size(); i++) {
3459              totalBytes += segmentsToMerge.get(i).segmentLength;
3460            }
3461            if (totalBytes != 0) //being paranoid
3462              progPerByte = 1.0f / (float)totalBytes;
3463            //reset factor to what it originally was
3464            factor = origFactor;
3465            return this;
3466          } else {
3467            //we want to spread the creation of temp files on multiple disks if 
3468            //available under the space constraints
3469            long approxOutputSize = 0; 
3470            for (SegmentDescriptor s : segmentsToMerge) {
3471              approxOutputSize += s.segmentLength + 
3472                                  ChecksumFileSystem.getApproxChkSumLength(
3473                                  s.segmentLength);
3474            }
3475            Path tmpFilename = 
3476              new Path(tmpDir, "intermediate").suffix("." + passNo);
3477
3478            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3479                                                tmpFilename.toString(),
3480                                                approxOutputSize, conf);
3481            if(LOG.isDebugEnabled()) { 
3482              LOG.debug("writing intermediate results to " + outputFile);
3483            }
3484            Writer writer = cloneFileAttributes(
3485                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3486                                                fs.makeQualified(outputFile), null);
3487            writer.sync = null; //disable sync for temp files
3488            writeFile(this, writer);
3489            writer.close();
3490            
3491            //we finished one single level merge; now clean up the priority 
3492            //queue
3493            this.close();
3494            
3495            SegmentDescriptor tempSegment = 
3496              new SegmentDescriptor(0,
3497                  fs.getFileStatus(outputFile).getLen(), outputFile);
3498            //put the segment back in the TreeMap
3499            sortedSegmentSizes.put(tempSegment, null);
3500            numSegments = sortedSegmentSizes.size();
3501            passNo++;
3502          }
3503          //we are worried about only the first pass merge factor. So reset the 
3504          //factor to what it originally was
3505          factor = origFactor;
3506        } while(true);
3507      }
3508  
3509      //Hadoop-591
3510      public int getPassFactor(int passNo, int numSegments) {
3511        if (passNo > 1 || numSegments <= factor || factor == 1) 
3512          return factor;
3513        int mod = (numSegments - 1) % (factor - 1);
3514        if (mod == 0)
3515          return factor;
3516        return mod + 1;
3517      }
3518      
3519      /** Return (& remove) the requested number of segment descriptors from the
3520       * sorted map.
3521       */
3522      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3523        if (numDescriptors > sortedSegmentSizes.size())
3524          numDescriptors = sortedSegmentSizes.size();
3525        SegmentDescriptor[] SegmentDescriptors = 
3526          new SegmentDescriptor[numDescriptors];
3527        Iterator iter = sortedSegmentSizes.keySet().iterator();
3528        int i = 0;
3529        while (i < numDescriptors) {
3530          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3531          iter.remove();
3532        }
3533        return SegmentDescriptors;
3534      }
3535    } // SequenceFile.Sorter.MergeQueue
3536
3537    /** This class defines a merge segment. This class can be subclassed to 
3538     * provide a customized cleanup method implementation. In this 
3539     * implementation, cleanup closes the file handle and deletes the file 
3540     */
3541    public class SegmentDescriptor implements Comparable {
3542      
3543      long segmentOffset; //the start of the segment in the file
3544      long segmentLength; //the length of the segment
3545      Path segmentPathName; //the path name of the file containing the segment
3546      boolean ignoreSync = true; //set to true for temp files
3547      private Reader in = null; 
3548      private DataOutputBuffer rawKey = null; //this will hold the current key
3549      private boolean preserveInput = false; //delete input segment files?
3550      
3551      /** Constructs a segment
3552       * @param segmentOffset the offset of the segment in the file
3553       * @param segmentLength the length of the segment
3554       * @param segmentPathName the path name of the file containing the segment
3555       */
3556      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3557                                Path segmentPathName) {
3558        this.segmentOffset = segmentOffset;
3559        this.segmentLength = segmentLength;
3560        this.segmentPathName = segmentPathName;
3561      }
3562      
3563      /** Do the sync checks */
3564      public void doSync() {ignoreSync = false;}
3565      
3566      /** Whether to delete the files when no longer needed */
3567      public void preserveInput(boolean preserve) {
3568        preserveInput = preserve;
3569      }
3570
3571      public boolean shouldPreserveInput() {
3572        return preserveInput;
3573      }
3574      
3575      @Override
3576      public int compareTo(Object o) {
3577        SegmentDescriptor that = (SegmentDescriptor)o;
3578        if (this.segmentLength != that.segmentLength) {
3579          return (this.segmentLength < that.segmentLength ? -1 : 1);
3580        }
3581        if (this.segmentOffset != that.segmentOffset) {
3582          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3583        }
3584        return (this.segmentPathName.toString()).
3585          compareTo(that.segmentPathName.toString());
3586      }
3587
3588      @Override
3589      public boolean equals(Object o) {
3590        if (!(o instanceof SegmentDescriptor)) {
3591          return false;
3592        }
3593        SegmentDescriptor that = (SegmentDescriptor)o;
3594        if (this.segmentLength == that.segmentLength &&
3595            this.segmentOffset == that.segmentOffset &&
3596            this.segmentPathName.toString().equals(
3597              that.segmentPathName.toString())) {
3598          return true;
3599        }
3600        return false;
3601      }
3602
3603      @Override
3604      public int hashCode() {
3605        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3606      }
3607
3608      /** Fills up the rawKey object with the key returned by the Reader
3609       * @return true if there is a key returned; false, otherwise
3610       * @throws IOException
3611       */
3612      public boolean nextRawKey() throws IOException {
3613        if (in == null) {
3614          int bufferSize = getBufferSize(conf); 
3615          Reader reader = new Reader(conf,
3616                                     Reader.file(segmentPathName), 
3617                                     Reader.bufferSize(bufferSize),
3618                                     Reader.start(segmentOffset), 
3619                                     Reader.length(segmentLength));
3620        
3621          //sometimes we ignore syncs especially for temp merge files
3622          if (ignoreSync) reader.ignoreSync();
3623
3624          if (reader.getKeyClass() != keyClass)
3625            throw new IOException("wrong key class: " + reader.getKeyClass() +
3626                                  " is not " + keyClass);
3627          if (reader.getValueClass() != valClass)
3628            throw new IOException("wrong value class: "+reader.getValueClass()+
3629                                  " is not " + valClass);
3630          this.in = reader;
3631          rawKey = new DataOutputBuffer();
3632        }
3633        rawKey.reset();
3634        int keyLength = 
3635          in.nextRawKey(rawKey);
3636        return (keyLength >= 0);
3637      }
3638
3639      /** Fills up the passed rawValue with the value corresponding to the key
3640       * read earlier
3641       * @param rawValue
3642       * @return the length of the value
3643       * @throws IOException
3644       */
3645      public int nextRawValue(ValueBytes rawValue) throws IOException {
3646        int valLength = in.nextRawValue(rawValue);
3647        return valLength;
3648      }
3649      
3650      /** Returns the stored rawKey */
3651      public DataOutputBuffer getKey() {
3652        return rawKey;
3653      }
3654      
3655      /** closes the underlying reader */
3656      private void close() throws IOException {
3657        this.in.close();
3658        this.in = null;
3659      }
3660
3661      /** The default cleanup. Subclasses can override this with a custom 
3662       * cleanup 
3663       */
3664      public void cleanup() throws IOException {
3665        close();
3666        if (!preserveInput) {
3667          fs.delete(segmentPathName, true);
3668        }
3669      }
3670    } // SequenceFile.Sorter.SegmentDescriptor
3671    
3672    /** This class provisions multiple segments contained within a single
3673     *  file
3674     */
3675    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3676
3677      SegmentContainer parentContainer = null;
3678
3679      /** Constructs a segment
3680       * @param segmentOffset the offset of the segment in the file
3681       * @param segmentLength the length of the segment
3682       * @param segmentPathName the path name of the file containing the segment
3683       * @param parent the parent SegmentContainer that holds the segment
3684       */
3685      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3686                                       Path segmentPathName, SegmentContainer parent) {
3687        super(segmentOffset, segmentLength, segmentPathName);
3688        this.parentContainer = parent;
3689      }
3690      /** The default cleanup. Subclasses can override this with a custom 
3691       * cleanup 
3692       */
3693      @Override
3694      public void cleanup() throws IOException {
3695        super.close();
3696        if (super.shouldPreserveInput()) return;
3697        parentContainer.cleanup();
3698      }
3699      
3700      @Override
3701      public boolean equals(Object o) {
3702        if (!(o instanceof LinkedSegmentsDescriptor)) {
3703          return false;
3704        }
3705        return super.equals(o);
3706      }
3707    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3708
3709    /** The class that defines a container for segments to be merged. Primarily
3710     * required to delete temp files as soon as all the contained segments
3711     * have been looked at */
3712    private class SegmentContainer {
3713      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3714      private int numSegmentsContained; //# of segments contained
3715      private Path inName; //input file from where segments are created
3716      
3717      //the list of segments read from the file
3718      private ArrayList <SegmentDescriptor> segments = 
3719        new ArrayList <SegmentDescriptor>();
3720      /** This constructor is there primarily to serve the sort routine that 
3721       * generates a single output file with an associated index file */
3722      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3723        //get the segments from indexIn
3724        FSDataInputStream fsIndexIn = fs.open(indexIn);
3725        long end = fs.getFileStatus(indexIn).getLen();
3726        while (fsIndexIn.getPos() < end) {
3727          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3728          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3729          Path segmentName = inName;
3730          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3731                                                    segmentLength, segmentName, this));
3732        }
3733        fsIndexIn.close();
3734        fs.delete(indexIn, true);
3735        numSegmentsContained = segments.size();
3736        this.inName = inName;
3737      }
3738
3739      public List <SegmentDescriptor> getSegmentList() {
3740        return segments;
3741      }
3742      public void cleanup() throws IOException {
3743        numSegmentsCleanedUp++;
3744        if (numSegmentsCleanedUp == numSegmentsContained) {
3745          fs.delete(inName, true);
3746        }
3747      }
3748    } //SequenceFile.Sorter.SegmentContainer
3749
3750  } // SequenceFile.Sorter
3751
3752} // SequenceFile