001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.util.*;
023import java.rmi.server.UID;
024import java.security.MessageDigest;
025
026import org.apache.commons.io.Charsets;
027import org.apache.commons.logging.*;
028import org.apache.hadoop.util.Options;
029import org.apache.hadoop.fs.*;
030import org.apache.hadoop.fs.Options.CreateOpts;
031import org.apache.hadoop.io.compress.CodecPool;
032import org.apache.hadoop.io.compress.CompressionCodec;
033import org.apache.hadoop.io.compress.CompressionInputStream;
034import org.apache.hadoop.io.compress.CompressionOutputStream;
035import org.apache.hadoop.io.compress.Compressor;
036import org.apache.hadoop.io.compress.Decompressor;
037import org.apache.hadoop.io.compress.DefaultCodec;
038import org.apache.hadoop.io.compress.GzipCodec;
039import org.apache.hadoop.io.compress.zlib.ZlibFactory;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.Serializer;
042import org.apache.hadoop.io.serializer.SerializationFactory;
043import org.apache.hadoop.classification.InterfaceAudience;
044import org.apache.hadoop.classification.InterfaceStability;
045import org.apache.hadoop.conf.*;
046import org.apache.hadoop.util.Progressable;
047import org.apache.hadoop.util.Progress;
048import org.apache.hadoop.util.ReflectionUtils;
049import org.apache.hadoop.util.NativeCodeLoader;
050import org.apache.hadoop.util.MergeSort;
051import org.apache.hadoop.util.PriorityQueue;
052import org.apache.hadoop.util.Time;
053
054/** 
055 * <code>SequenceFile</code>s are flat files consisting of binary key/value 
056 * pairs.
057 * 
058 * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
059 * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
060 * reading and sorting respectively.</p>
061 * 
062 * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
063 * {@link CompressionType} used to compress key/value pairs:
064 * <ol>
065 *   <li>
066 *   <code>Writer</code> : Uncompressed records.
067 *   </li>
068 *   <li>
069 *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
070 *                                       values.
071 *   </li>
072 *   <li>
073 *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
074 *                                      values are collected in 'blocks' 
075 *                                      separately and compressed. The size of 
076 *                                      the 'block' is configurable.
077 * </ol>
078 * 
079 * <p>The actual compression algorithm used to compress key and/or values can be
080 * specified by using the appropriate {@link CompressionCodec}.</p>
081 * 
082 * <p>The recommended way is to use the static <tt>createWriter</tt> methods
083 * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
084 *
085 * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
086 * above <code>SequenceFile</code> formats.</p>
087 *
088 * <h4 id="Formats">SequenceFile Formats</h4>
089 * 
090 * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
091 * depending on the <code>CompressionType</code> specified. All of them share a
092 * <a href="#Header">common header</a> described below.
093 * 
094 * <h5 id="Header">SequenceFile Header</h5>
095 * <ul>
096 *   <li>
097 *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
098 *             version number (e.g. SEQ4 or SEQ6)
099 *   </li>
100 *   <li>
101 *   keyClassName -key class
102 *   </li>
103 *   <li>
104 *   valueClassName - value class
105 *   </li>
106 *   <li>
107 *   compression - A boolean which specifies if compression is turned on for 
108 *                 keys/values in this file.
109 *   </li>
110 *   <li>
111 *   blockCompression - A boolean which specifies if block-compression is 
112 *                      turned on for keys/values in this file.
113 *   </li>
114 *   <li>
115 *   compression codec - <code>CompressionCodec</code> class which is used for  
116 *                       compression of keys and/or values (if compression is 
117 *                       enabled).
118 *   </li>
119 *   <li>
120 *   metadata - {@link Metadata} for this file.
121 *   </li>
122 *   <li>
123 *   sync - A sync marker to denote end of the header.
124 *   </li>
125 * </ul>
126 * 
127 * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
128 * <ul>
129 * <li>
130 * <a href="#Header">Header</a>
131 * </li>
132 * <li>
133 * Record
134 *   <ul>
135 *     <li>Record length</li>
136 *     <li>Key length</li>
137 *     <li>Key</li>
138 *     <li>Value</li>
139 *   </ul>
140 * </li>
141 * <li>
142 * A sync-marker every few <code>100</code> bytes or so.
143 * </li>
144 * </ul>
145 *
146 * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
147 * <ul>
148 * <li>
149 * <a href="#Header">Header</a>
150 * </li>
151 * <li>
152 * Record
153 *   <ul>
154 *     <li>Record length</li>
155 *     <li>Key length</li>
156 *     <li>Key</li>
157 *     <li><i>Compressed</i> Value</li>
158 *   </ul>
159 * </li>
160 * <li>
161 * A sync-marker every few <code>100</code> bytes or so.
162 * </li>
163 * </ul>
164 * 
165 * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
166 * <ul>
167 * <li>
168 * <a href="#Header">Header</a>
169 * </li>
170 * <li>
171 * Record <i>Block</i>
172 *   <ul>
173 *     <li>Uncompressed number of records in the block</li>
174 *     <li>Compressed key-lengths block-size</li>
175 *     <li>Compressed key-lengths block</li>
176 *     <li>Compressed keys block-size</li>
177 *     <li>Compressed keys block</li>
178 *     <li>Compressed value-lengths block-size</li>
179 *     <li>Compressed value-lengths block</li>
180 *     <li>Compressed values block-size</li>
181 *     <li>Compressed values block</li>
182 *   </ul>
183 * </li>
184 * <li>
185 * A sync-marker every block.
186 * </li>
187 * </ul>
188 * 
189 * <p>The compressed blocks of key lengths and value lengths consist of the 
190 * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
191 * format.</p>
192 * 
193 * @see CompressionCodec
194 */
195@InterfaceAudience.Public
196@InterfaceStability.Stable
197public class SequenceFile {
198  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
199
200  private SequenceFile() {}                         // no public ctor
201
202  private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
203  private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
204  private static final byte VERSION_WITH_METADATA = (byte)6;
205  private static byte[] VERSION = new byte[] {
206    (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
207  };
208
209  private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
210  private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
211  private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
212
213  /** The number of bytes between sync points.*/
214  public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
215
216  /** 
217   * The compression type used to compress key/value pairs in the 
218   * {@link SequenceFile}.
219   * 
220   * @see SequenceFile.Writer
221   */
222  public static enum CompressionType {
223    /** Do not compress records. */
224    NONE, 
225    /** Compress values only, each separately. */
226    RECORD,
227    /** Compress sequences of records together in blocks. */
228    BLOCK
229  }
230
231  /**
232   * Get the compression type for the reduce outputs
233   * @param job the job config to look in
234   * @return the kind of compression to use
235   */
236  static public CompressionType getDefaultCompressionType(Configuration job) {
237    String name = job.get("io.seqfile.compression.type");
238    return name == null ? CompressionType.RECORD : 
239      CompressionType.valueOf(name);
240  }
241  
242  /**
243   * Set the default compression type for sequence files.
244   * @param job the configuration to modify
245   * @param val the new compression type (none, block, record)
246   */
247  static public void setDefaultCompressionType(Configuration job, 
248                                               CompressionType val) {
249    job.set("io.seqfile.compression.type", val.toString());
250  }
251
252  /**
253   * Create a new Writer with the given options.
254   * @param conf the configuration to use
255   * @param opts the options to create the file with
256   * @return a new Writer
257   * @throws IOException
258   */
259  public static Writer createWriter(Configuration conf, Writer.Option... opts
260                                    ) throws IOException {
261    Writer.CompressionOption compressionOption = 
262      Options.getOption(Writer.CompressionOption.class, opts);
263    CompressionType kind;
264    if (compressionOption != null) {
265      kind = compressionOption.getValue();
266    } else {
267      kind = getDefaultCompressionType(conf);
268      opts = Options.prependOptions(opts, Writer.compression(kind));
269    }
270    switch (kind) {
271      default:
272      case NONE:
273        return new Writer(conf, opts);
274      case RECORD:
275        return new RecordCompressWriter(conf, opts);
276      case BLOCK:
277        return new BlockCompressWriter(conf, opts);
278    }
279  }
280
281  /**
282   * Construct the preferred type of SequenceFile Writer.
283   * @param fs The configured filesystem. 
284   * @param conf The configuration.
285   * @param name The name of the file. 
286   * @param keyClass The 'key' type.
287   * @param valClass The 'value' type.
288   * @return Returns the handle to the constructed SequenceFile Writer.
289   * @throws IOException
290   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
291   *     instead.
292   */
293  @Deprecated
294  public static Writer 
295    createWriter(FileSystem fs, Configuration conf, Path name, 
296                 Class keyClass, Class valClass) throws IOException {
297    return createWriter(conf, Writer.filesystem(fs),
298                        Writer.file(name), Writer.keyClass(keyClass),
299                        Writer.valueClass(valClass));
300  }
301  
302  /**
303   * Construct the preferred type of SequenceFile Writer.
304   * @param fs The configured filesystem. 
305   * @param conf The configuration.
306   * @param name The name of the file. 
307   * @param keyClass The 'key' type.
308   * @param valClass The 'value' type.
309   * @param compressionType The compression type.
310   * @return Returns the handle to the constructed SequenceFile Writer.
311   * @throws IOException
312   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
313   *     instead.
314   */
315  @Deprecated
316  public static Writer 
317    createWriter(FileSystem fs, Configuration conf, Path name, 
318                 Class keyClass, Class valClass, 
319                 CompressionType compressionType) throws IOException {
320    return createWriter(conf, Writer.filesystem(fs),
321                        Writer.file(name), Writer.keyClass(keyClass),
322                        Writer.valueClass(valClass), 
323                        Writer.compression(compressionType));
324  }
325  
326  /**
327   * Construct the preferred type of SequenceFile Writer.
328   * @param fs The configured filesystem. 
329   * @param conf The configuration.
330   * @param name The name of the file. 
331   * @param keyClass The 'key' type.
332   * @param valClass The 'value' type.
333   * @param compressionType The compression type.
334   * @param progress The Progressable object to track progress.
335   * @return Returns the handle to the constructed SequenceFile Writer.
336   * @throws IOException
337   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
338   *     instead.
339   */
340  @Deprecated
341  public static Writer
342    createWriter(FileSystem fs, Configuration conf, Path name, 
343                 Class keyClass, Class valClass, CompressionType compressionType,
344                 Progressable progress) throws IOException {
345    return createWriter(conf, Writer.file(name),
346                        Writer.filesystem(fs),
347                        Writer.keyClass(keyClass),
348                        Writer.valueClass(valClass), 
349                        Writer.compression(compressionType),
350                        Writer.progressable(progress));
351  }
352
353  /**
354   * Construct the preferred type of SequenceFile Writer.
355   * @param fs The configured filesystem. 
356   * @param conf The configuration.
357   * @param name The name of the file. 
358   * @param keyClass The 'key' type.
359   * @param valClass The 'value' type.
360   * @param compressionType The compression type.
361   * @param codec The compression codec.
362   * @return Returns the handle to the constructed SequenceFile Writer.
363   * @throws IOException
364   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
365   *     instead.
366   */
367  @Deprecated
368  public static Writer 
369    createWriter(FileSystem fs, Configuration conf, Path name, 
370                 Class keyClass, Class valClass, CompressionType compressionType, 
371                 CompressionCodec codec) throws IOException {
372    return createWriter(conf, Writer.file(name),
373                        Writer.filesystem(fs),
374                        Writer.keyClass(keyClass),
375                        Writer.valueClass(valClass), 
376                        Writer.compression(compressionType, codec));
377  }
378  
379  /**
380   * Construct the preferred type of SequenceFile Writer.
381   * @param fs The configured filesystem. 
382   * @param conf The configuration.
383   * @param name The name of the file. 
384   * @param keyClass The 'key' type.
385   * @param valClass The 'value' type.
386   * @param compressionType The compression type.
387   * @param codec The compression codec.
388   * @param progress The Progressable object to track progress.
389   * @param metadata The metadata of the file.
390   * @return Returns the handle to the constructed SequenceFile Writer.
391   * @throws IOException
392   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
393   *     instead.
394   */
395  @Deprecated
396  public static Writer
397    createWriter(FileSystem fs, Configuration conf, Path name, 
398                 Class keyClass, Class valClass, 
399                 CompressionType compressionType, CompressionCodec codec,
400                 Progressable progress, Metadata metadata) throws IOException {
401    return createWriter(conf, Writer.file(name),
402                        Writer.filesystem(fs),
403                        Writer.keyClass(keyClass),
404                        Writer.valueClass(valClass),
405                        Writer.compression(compressionType, codec),
406                        Writer.progressable(progress),
407                        Writer.metadata(metadata));
408  }
409
410  /**
411   * Construct the preferred type of SequenceFile Writer.
412   * @param fs The configured filesystem.
413   * @param conf The configuration.
414   * @param name The name of the file.
415   * @param keyClass The 'key' type.
416   * @param valClass The 'value' type.
417   * @param bufferSize buffer size for the underlaying outputstream.
418   * @param replication replication factor for the file.
419   * @param blockSize block size for the file.
420   * @param compressionType The compression type.
421   * @param codec The compression codec.
422   * @param progress The Progressable object to track progress.
423   * @param metadata The metadata of the file.
424   * @return Returns the handle to the constructed SequenceFile Writer.
425   * @throws IOException
426   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
427   *     instead.
428   */
429  @Deprecated
430  public static Writer
431    createWriter(FileSystem fs, Configuration conf, Path name,
432                 Class keyClass, Class valClass, int bufferSize,
433                 short replication, long blockSize,
434                 CompressionType compressionType, CompressionCodec codec,
435                 Progressable progress, Metadata metadata) throws IOException {
436    return createWriter(conf, Writer.file(name),
437                        Writer.filesystem(fs),
438                        Writer.keyClass(keyClass),
439                        Writer.valueClass(valClass), 
440                        Writer.bufferSize(bufferSize), 
441                        Writer.replication(replication),
442                        Writer.blockSize(blockSize),
443                        Writer.compression(compressionType, codec),
444                        Writer.progressable(progress),
445                        Writer.metadata(metadata));
446  }
447
448  /**
449   * Construct the preferred type of SequenceFile Writer.
450   * @param fs The configured filesystem.
451   * @param conf The configuration.
452   * @param name The name of the file.
453   * @param keyClass The 'key' type.
454   * @param valClass The 'value' type.
455   * @param bufferSize buffer size for the underlaying outputstream.
456   * @param replication replication factor for the file.
457   * @param blockSize block size for the file.
458   * @param createParent create parent directory if non-existent
459   * @param compressionType The compression type.
460   * @param codec The compression codec.
461   * @param metadata The metadata of the file.
462   * @return Returns the handle to the constructed SequenceFile Writer.
463   * @throws IOException
464   */
465  @Deprecated
466  public static Writer
467  createWriter(FileSystem fs, Configuration conf, Path name,
468               Class keyClass, Class valClass, int bufferSize,
469               short replication, long blockSize, boolean createParent,
470               CompressionType compressionType, CompressionCodec codec,
471               Metadata metadata) throws IOException {
472    return createWriter(FileContext.getFileContext(fs.getUri(), conf),
473        conf, name, keyClass, valClass, compressionType, codec,
474        metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
475        CreateOpts.bufferSize(bufferSize),
476        createParent ? CreateOpts.createParent()
477                     : CreateOpts.donotCreateParent(),
478        CreateOpts.repFac(replication),
479        CreateOpts.blockSize(blockSize)
480      );
481  }
482
483  /**
484   * Construct the preferred type of SequenceFile Writer.
485   * @param fc The context for the specified file.
486   * @param conf The configuration.
487   * @param name The name of the file.
488   * @param keyClass The 'key' type.
489   * @param valClass The 'value' type.
490   * @param compressionType The compression type.
491   * @param codec The compression codec.
492   * @param metadata The metadata of the file.
493   * @param createFlag gives the semantics of create: overwrite, append etc.
494   * @param opts file creation options; see {@link CreateOpts}.
495   * @return Returns the handle to the constructed SequenceFile Writer.
496   * @throws IOException
497   */
498  public static Writer
499  createWriter(FileContext fc, Configuration conf, Path name,
500               Class keyClass, Class valClass,
501               CompressionType compressionType, CompressionCodec codec,
502               Metadata metadata,
503               final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
504               throws IOException {
505    return createWriter(conf, fc.create(name, createFlag, opts),
506          keyClass, valClass, compressionType, codec, metadata).ownStream();
507  }
508
509  /**
510   * Construct the preferred type of SequenceFile Writer.
511   * @param fs The configured filesystem. 
512   * @param conf The configuration.
513   * @param name The name of the file. 
514   * @param keyClass The 'key' type.
515   * @param valClass The 'value' type.
516   * @param compressionType The compression type.
517   * @param codec The compression codec.
518   * @param progress The Progressable object to track progress.
519   * @return Returns the handle to the constructed SequenceFile Writer.
520   * @throws IOException
521   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
522   *     instead.
523   */
524  @Deprecated
525  public static Writer
526    createWriter(FileSystem fs, Configuration conf, Path name, 
527                 Class keyClass, Class valClass, 
528                 CompressionType compressionType, CompressionCodec codec,
529                 Progressable progress) throws IOException {
530    return createWriter(conf, Writer.file(name),
531                        Writer.filesystem(fs),
532                        Writer.keyClass(keyClass),
533                        Writer.valueClass(valClass),
534                        Writer.compression(compressionType, codec),
535                        Writer.progressable(progress));
536  }
537
538  /**
539   * Construct the preferred type of 'raw' SequenceFile Writer.
540   * @param conf The configuration.
541   * @param out The stream on top which the writer is to be constructed.
542   * @param keyClass The 'key' type.
543   * @param valClass The 'value' type.
544   * @param compressionType The compression type.
545   * @param codec The compression codec.
546   * @param metadata The metadata of the file.
547   * @return Returns the handle to the constructed SequenceFile Writer.
548   * @throws IOException
549   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
550   *     instead.
551   */
552  @Deprecated
553  public static Writer
554    createWriter(Configuration conf, FSDataOutputStream out, 
555                 Class keyClass, Class valClass,
556                 CompressionType compressionType,
557                 CompressionCodec codec, Metadata metadata) throws IOException {
558    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
559                        Writer.valueClass(valClass), 
560                        Writer.compression(compressionType, codec),
561                        Writer.metadata(metadata));
562  }
563  
564  /**
565   * Construct the preferred type of 'raw' SequenceFile Writer.
566   * @param conf The configuration.
567   * @param out The stream on top which the writer is to be constructed.
568   * @param keyClass The 'key' type.
569   * @param valClass The 'value' type.
570   * @param compressionType The compression type.
571   * @param codec The compression codec.
572   * @return Returns the handle to the constructed SequenceFile Writer.
573   * @throws IOException
574   * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
575   *     instead.
576   */
577  @Deprecated
578  public static Writer
579    createWriter(Configuration conf, FSDataOutputStream out, 
580                 Class keyClass, Class valClass, CompressionType compressionType,
581                 CompressionCodec codec) throws IOException {
582    return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
583                        Writer.valueClass(valClass),
584                        Writer.compression(compressionType, codec));
585  }
586  
587
588  /** The interface to 'raw' values of SequenceFiles. */
589  public static interface ValueBytes {
590
591    /** Writes the uncompressed bytes to the outStream.
592     * @param outStream : Stream to write uncompressed bytes into.
593     * @throws IOException
594     */
595    public void writeUncompressedBytes(DataOutputStream outStream)
596      throws IOException;
597
598    /** Write compressed bytes to outStream. 
599     * Note: that it will NOT compress the bytes if they are not compressed.
600     * @param outStream : Stream to write compressed bytes into.
601     */
602    public void writeCompressedBytes(DataOutputStream outStream) 
603      throws IllegalArgumentException, IOException;
604
605    /**
606     * Size of stored data.
607     */
608    public int getSize();
609  }
610  
611  private static class UncompressedBytes implements ValueBytes {
612    private int dataSize;
613    private byte[] data;
614    
615    private UncompressedBytes() {
616      data = null;
617      dataSize = 0;
618    }
619    
620    private void reset(DataInputStream in, int length) throws IOException {
621      if (data == null) {
622        data = new byte[length];
623      } else if (length > data.length) {
624        data = new byte[Math.max(length, data.length * 2)];
625      }
626      dataSize = -1;
627      in.readFully(data, 0, length);
628      dataSize = length;
629    }
630    
631    @Override
632    public int getSize() {
633      return dataSize;
634    }
635    
636    @Override
637    public void writeUncompressedBytes(DataOutputStream outStream)
638      throws IOException {
639      outStream.write(data, 0, dataSize);
640    }
641
642    @Override
643    public void writeCompressedBytes(DataOutputStream outStream) 
644      throws IllegalArgumentException, IOException {
645      throw 
646        new IllegalArgumentException("UncompressedBytes cannot be compressed!");
647    }
648
649  } // UncompressedBytes
650  
651  private static class CompressedBytes implements ValueBytes {
652    private int dataSize;
653    private byte[] data;
654    DataInputBuffer rawData = null;
655    CompressionCodec codec = null;
656    CompressionInputStream decompressedStream = null;
657
658    private CompressedBytes(CompressionCodec codec) {
659      data = null;
660      dataSize = 0;
661      this.codec = codec;
662    }
663
664    private void reset(DataInputStream in, int length) throws IOException {
665      if (data == null) {
666        data = new byte[length];
667      } else if (length > data.length) {
668        data = new byte[Math.max(length, data.length * 2)];
669      } 
670      dataSize = -1;
671      in.readFully(data, 0, length);
672      dataSize = length;
673    }
674    
675    @Override
676    public int getSize() {
677      return dataSize;
678    }
679    
680    @Override
681    public void writeUncompressedBytes(DataOutputStream outStream)
682      throws IOException {
683      if (decompressedStream == null) {
684        rawData = new DataInputBuffer();
685        decompressedStream = codec.createInputStream(rawData);
686      } else {
687        decompressedStream.resetState();
688      }
689      rawData.reset(data, 0, dataSize);
690
691      byte[] buffer = new byte[8192];
692      int bytesRead = 0;
693      while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
694        outStream.write(buffer, 0, bytesRead);
695      }
696    }
697
698    @Override
699    public void writeCompressedBytes(DataOutputStream outStream) 
700      throws IllegalArgumentException, IOException {
701      outStream.write(data, 0, dataSize);
702    }
703
704  } // CompressedBytes
705  
706  /**
707   * The class encapsulating with the metadata of a file.
708   * The metadata of a file is a list of attribute name/value
709   * pairs of Text type.
710   *
711   */
712  public static class Metadata implements Writable {
713
714    private TreeMap<Text, Text> theMetadata;
715    
716    public Metadata() {
717      this(new TreeMap<Text, Text>());
718    }
719    
720    public Metadata(TreeMap<Text, Text> arg) {
721      if (arg == null) {
722        this.theMetadata = new TreeMap<Text, Text>();
723      } else {
724        this.theMetadata = arg;
725      }
726    }
727    
728    public Text get(Text name) {
729      return this.theMetadata.get(name);
730    }
731    
732    public void set(Text name, Text value) {
733      this.theMetadata.put(name, value);
734    }
735    
736    public TreeMap<Text, Text> getMetadata() {
737      return new TreeMap<Text, Text>(this.theMetadata);
738    }
739    
740    @Override
741    public void write(DataOutput out) throws IOException {
742      out.writeInt(this.theMetadata.size());
743      Iterator<Map.Entry<Text, Text>> iter =
744        this.theMetadata.entrySet().iterator();
745      while (iter.hasNext()) {
746        Map.Entry<Text, Text> en = iter.next();
747        en.getKey().write(out);
748        en.getValue().write(out);
749      }
750    }
751
752    @Override
753    public void readFields(DataInput in) throws IOException {
754      int sz = in.readInt();
755      if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
756      this.theMetadata = new TreeMap<Text, Text>();
757      for (int i = 0; i < sz; i++) {
758        Text key = new Text();
759        Text val = new Text();
760        key.readFields(in);
761        val.readFields(in);
762        this.theMetadata.put(key, val);
763      }    
764    }
765
766    @Override
767    public boolean equals(Object other) {
768      if (other == null) {
769        return false;
770      }
771      if (other.getClass() != this.getClass()) {
772        return false;
773      } else {
774        return equals((Metadata)other);
775      }
776    }
777    
778    public boolean equals(Metadata other) {
779      if (other == null) return false;
780      if (this.theMetadata.size() != other.theMetadata.size()) {
781        return false;
782      }
783      Iterator<Map.Entry<Text, Text>> iter1 =
784        this.theMetadata.entrySet().iterator();
785      Iterator<Map.Entry<Text, Text>> iter2 =
786        other.theMetadata.entrySet().iterator();
787      while (iter1.hasNext() && iter2.hasNext()) {
788        Map.Entry<Text, Text> en1 = iter1.next();
789        Map.Entry<Text, Text> en2 = iter2.next();
790        if (!en1.getKey().equals(en2.getKey())) {
791          return false;
792        }
793        if (!en1.getValue().equals(en2.getValue())) {
794          return false;
795        }
796      }
797      if (iter1.hasNext() || iter2.hasNext()) {
798        return false;
799      }
800      return true;
801    }
802
803    @Override
804    public int hashCode() {
805      assert false : "hashCode not designed";
806      return 42; // any arbitrary constant will do 
807    }
808    
809    @Override
810    public String toString() {
811      StringBuilder sb = new StringBuilder();
812      sb.append("size: ").append(this.theMetadata.size()).append("\n");
813      Iterator<Map.Entry<Text, Text>> iter =
814        this.theMetadata.entrySet().iterator();
815      while (iter.hasNext()) {
816        Map.Entry<Text, Text> en = iter.next();
817        sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
818        sb.append("\n");
819      }
820      return sb.toString();
821    }
822  }
823  
824  /** Write key/value pairs to a sequence-format file. */
825  public static class Writer implements java.io.Closeable, Syncable {
826    private Configuration conf;
827    FSDataOutputStream out;
828    boolean ownOutputStream = true;
829    DataOutputBuffer buffer = new DataOutputBuffer();
830
831    Class keyClass;
832    Class valClass;
833
834    private final CompressionType compress;
835    CompressionCodec codec = null;
836    CompressionOutputStream deflateFilter = null;
837    DataOutputStream deflateOut = null;
838    Metadata metadata = null;
839    Compressor compressor = null;
840
841    private boolean appendMode = false;
842
843    protected Serializer keySerializer;
844    protected Serializer uncompressedValSerializer;
845    protected Serializer compressedValSerializer;
846    
847    // Insert a globally unique 16-byte value every few entries, so that one
848    // can seek into the middle of a file and then synchronize with record
849    // starts and ends by scanning for this value.
850    long lastSyncPos;                     // position of last sync
851    byte[] sync;                          // 16 random bytes
852    {
853      try {                                       
854        MessageDigest digester = MessageDigest.getInstance("MD5");
855        long time = Time.now();
856        digester.update((new UID()+"@"+time).getBytes(Charsets.UTF_8));
857        sync = digester.digest();
858      } catch (Exception e) {
859        throw new RuntimeException(e);
860      }
861    }
862
863    public static interface Option {}
864    
865    static class FileOption extends Options.PathOption 
866                                    implements Option {
867      FileOption(Path path) {
868        super(path);
869      }
870    }
871
872    /**
873     * @deprecated only used for backwards-compatibility in the createWriter methods
874     * that take FileSystem.
875     */
876    @Deprecated
877    private static class FileSystemOption implements Option {
878      private final FileSystem value;
879      protected FileSystemOption(FileSystem value) {
880        this.value = value;
881      }
882      public FileSystem getValue() {
883        return value;
884      }
885    }
886
887    static class StreamOption extends Options.FSDataOutputStreamOption 
888                              implements Option {
889      StreamOption(FSDataOutputStream stream) {
890        super(stream);
891      }
892    }
893
894    static class BufferSizeOption extends Options.IntegerOption
895                                  implements Option {
896      BufferSizeOption(int value) {
897        super(value);
898      }
899    }
900    
901    static class BlockSizeOption extends Options.LongOption implements Option {
902      BlockSizeOption(long value) {
903        super(value);
904      }
905    }
906
907    static class ReplicationOption extends Options.IntegerOption
908                                   implements Option {
909      ReplicationOption(int value) {
910        super(value);
911      }
912    }
913
914    static class AppendIfExistsOption extends Options.BooleanOption implements
915        Option {
916      AppendIfExistsOption(boolean value) {
917        super(value);
918      }
919    }
920
921    static class KeyClassOption extends Options.ClassOption implements Option {
922      KeyClassOption(Class<?> value) {
923        super(value);
924      }
925    }
926
927    static class ValueClassOption extends Options.ClassOption
928                                          implements Option {
929      ValueClassOption(Class<?> value) {
930        super(value);
931      }
932    }
933
934    static class MetadataOption implements Option {
935      private final Metadata value;
936      MetadataOption(Metadata value) {
937        this.value = value;
938      }
939      Metadata getValue() {
940        return value;
941      }
942    }
943
944    static class ProgressableOption extends Options.ProgressableOption
945                                    implements Option {
946      ProgressableOption(Progressable value) {
947        super(value);
948      }
949    }
950
951    private static class CompressionOption implements Option {
952      private final CompressionType value;
953      private final CompressionCodec codec;
954      CompressionOption(CompressionType value) {
955        this(value, null);
956      }
957      CompressionOption(CompressionType value, CompressionCodec codec) {
958        this.value = value;
959        this.codec = (CompressionType.NONE != value && null == codec)
960          ? new DefaultCodec()
961          : codec;
962      }
963      CompressionType getValue() {
964        return value;
965      }
966      CompressionCodec getCodec() {
967        return codec;
968      }
969    }
970
971    public static Option file(Path value) {
972      return new FileOption(value);
973    }
974
975    /**
976     * @deprecated only used for backwards-compatibility in the createWriter methods
977     * that take FileSystem.
978     */
979    @Deprecated
980    private static Option filesystem(FileSystem fs) {
981      return new SequenceFile.Writer.FileSystemOption(fs);
982    }
983    
984    public static Option bufferSize(int value) {
985      return new BufferSizeOption(value);
986    }
987    
988    public static Option stream(FSDataOutputStream value) {
989      return new StreamOption(value);
990    }
991    
992    public static Option replication(short value) {
993      return new ReplicationOption(value);
994    }
995    
996    public static Option appendIfExists(boolean value) {
997      return new AppendIfExistsOption(value);
998    }
999
1000    public static Option blockSize(long value) {
1001      return new BlockSizeOption(value);
1002    }
1003    
1004    public static Option progressable(Progressable value) {
1005      return new ProgressableOption(value);
1006    }
1007
1008    public static Option keyClass(Class<?> value) {
1009      return new KeyClassOption(value);
1010    }
1011    
1012    public static Option valueClass(Class<?> value) {
1013      return new ValueClassOption(value);
1014    }
1015    
1016    public static Option metadata(Metadata value) {
1017      return new MetadataOption(value);
1018    }
1019
1020    public static Option compression(CompressionType value) {
1021      return new CompressionOption(value);
1022    }
1023
1024    public static Option compression(CompressionType value,
1025        CompressionCodec codec) {
1026      return new CompressionOption(value, codec);
1027    }
1028    
1029    /**
1030     * Construct a uncompressed writer from a set of options.
1031     * @param conf the configuration to use
1032     * @param options the options used when creating the writer
1033     * @throws IOException if it fails
1034     */
1035    Writer(Configuration conf, 
1036           Option... opts) throws IOException {
1037      BlockSizeOption blockSizeOption = 
1038        Options.getOption(BlockSizeOption.class, opts);
1039      BufferSizeOption bufferSizeOption = 
1040        Options.getOption(BufferSizeOption.class, opts);
1041      ReplicationOption replicationOption = 
1042        Options.getOption(ReplicationOption.class, opts);
1043      ProgressableOption progressOption = 
1044        Options.getOption(ProgressableOption.class, opts);
1045      FileOption fileOption = Options.getOption(FileOption.class, opts);
1046      AppendIfExistsOption appendIfExistsOption = Options.getOption(
1047          AppendIfExistsOption.class, opts);
1048      FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1049      StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1050      KeyClassOption keyClassOption = 
1051        Options.getOption(KeyClassOption.class, opts);
1052      ValueClassOption valueClassOption = 
1053        Options.getOption(ValueClassOption.class, opts);
1054      MetadataOption metadataOption = 
1055        Options.getOption(MetadataOption.class, opts);
1056      CompressionOption compressionTypeOption =
1057        Options.getOption(CompressionOption.class, opts);
1058      // check consistency of options
1059      if ((fileOption == null) == (streamOption == null)) {
1060        throw new IllegalArgumentException("file or stream must be specified");
1061      }
1062      if (fileOption == null && (blockSizeOption != null ||
1063                                 bufferSizeOption != null ||
1064                                 replicationOption != null ||
1065                                 progressOption != null)) {
1066        throw new IllegalArgumentException("file modifier options not " +
1067                                           "compatible with stream");
1068      }
1069
1070      FSDataOutputStream out;
1071      boolean ownStream = fileOption != null;
1072      if (ownStream) {
1073        Path p = fileOption.getValue();
1074        FileSystem fs;
1075        if (fsOption != null) {
1076          fs = fsOption.getValue();
1077        } else {
1078          fs = p.getFileSystem(conf);
1079        }
1080        int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1081          bufferSizeOption.getValue();
1082        short replication = replicationOption == null ? 
1083          fs.getDefaultReplication(p) :
1084          (short) replicationOption.getValue();
1085        long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1086          blockSizeOption.getValue();
1087        Progressable progress = progressOption == null ? null :
1088          progressOption.getValue();
1089
1090        if (appendIfExistsOption != null && appendIfExistsOption.getValue()
1091            && fs.exists(p)) {
1092
1093          // Read the file and verify header details
1094          SequenceFile.Reader reader = new SequenceFile.Reader(conf,
1095              SequenceFile.Reader.file(p), new Reader.OnlyHeaderOption());
1096          try {
1097
1098            if (keyClassOption.getValue() != reader.getKeyClass()
1099                || valueClassOption.getValue() != reader.getValueClass()) {
1100              throw new IllegalArgumentException(
1101                  "Key/value class provided does not match the file");
1102            }
1103
1104            if (reader.getVersion() != VERSION[3]) {
1105              throw new VersionMismatchException(VERSION[3],
1106                  reader.getVersion());
1107            }
1108
1109            if (metadataOption != null) {
1110              LOG.info("MetaData Option is ignored during append");
1111            }
1112            metadataOption = (MetadataOption) SequenceFile.Writer
1113                .metadata(reader.getMetadata());
1114
1115            CompressionOption readerCompressionOption = new CompressionOption(
1116                reader.getCompressionType(), reader.getCompressionCodec());
1117
1118            if (readerCompressionOption.value != compressionTypeOption.value
1119                || !readerCompressionOption.codec.getClass().getName()
1120                    .equals(compressionTypeOption.codec.getClass().getName())) {
1121              throw new IllegalArgumentException(
1122                  "Compression option provided does not match the file");
1123            }
1124
1125            sync = reader.getSync();
1126
1127          } finally {
1128            reader.close();
1129          }
1130
1131          out = fs.append(p, bufferSize, progress);
1132          this.appendMode = true;
1133        } else {
1134          out = fs
1135              .create(p, true, bufferSize, replication, blockSize, progress);
1136        }
1137      } else {
1138        out = streamOption.getValue();
1139      }
1140      Class<?> keyClass = keyClassOption == null ?
1141          Object.class : keyClassOption.getValue();
1142      Class<?> valueClass = valueClassOption == null ?
1143          Object.class : valueClassOption.getValue();
1144      Metadata metadata = metadataOption == null ?
1145          new Metadata() : metadataOption.getValue();
1146      this.compress = compressionTypeOption.getValue();
1147      final CompressionCodec codec = compressionTypeOption.getCodec();
1148      if (codec != null &&
1149          (codec instanceof GzipCodec) &&
1150          !NativeCodeLoader.isNativeCodeLoaded() &&
1151          !ZlibFactory.isNativeZlibLoaded(conf)) {
1152        throw new IllegalArgumentException("SequenceFile doesn't work with " +
1153                                           "GzipCodec without native-hadoop " +
1154                                           "code!");
1155      }
1156      init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1157    }
1158
1159    /** Create the named file.
1160     * @deprecated Use 
1161     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1162     *   instead.
1163     */
1164    @Deprecated
1165    public Writer(FileSystem fs, Configuration conf, Path name, 
1166                  Class keyClass, Class valClass) throws IOException {
1167      this.compress = CompressionType.NONE;
1168      init(conf, fs.create(name), true, keyClass, valClass, null, 
1169           new Metadata());
1170    }
1171    
1172    /** Create the named file with write-progress reporter.
1173     * @deprecated Use 
1174     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1175     *   instead.
1176     */
1177    @Deprecated
1178    public Writer(FileSystem fs, Configuration conf, Path name, 
1179                  Class keyClass, Class valClass,
1180                  Progressable progress, Metadata metadata) throws IOException {
1181      this.compress = CompressionType.NONE;
1182      init(conf, fs.create(name, progress), true, keyClass, valClass,
1183           null, metadata);
1184    }
1185    
1186    /** Create the named file with write-progress reporter. 
1187     * @deprecated Use 
1188     *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1189     *   instead.
1190     */
1191    @Deprecated
1192    public Writer(FileSystem fs, Configuration conf, Path name,
1193                  Class keyClass, Class valClass,
1194                  int bufferSize, short replication, long blockSize,
1195                  Progressable progress, Metadata metadata) throws IOException {
1196      this.compress = CompressionType.NONE;
1197      init(conf,
1198           fs.create(name, true, bufferSize, replication, blockSize, progress),
1199           true, keyClass, valClass, null, metadata);
1200    }
1201
1202    boolean isCompressed() { return compress != CompressionType.NONE; }
1203    boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1204    
1205    Writer ownStream() { this.ownOutputStream = true; return this;  }
1206
1207    /** Write and flush the file header. */
1208    private void writeFileHeader() 
1209      throws IOException {
1210      out.write(VERSION);
1211      Text.writeString(out, keyClass.getName());
1212      Text.writeString(out, valClass.getName());
1213      
1214      out.writeBoolean(this.isCompressed());
1215      out.writeBoolean(this.isBlockCompressed());
1216      
1217      if (this.isCompressed()) {
1218        Text.writeString(out, (codec.getClass()).getName());
1219      }
1220      this.metadata.write(out);
1221      out.write(sync);                       // write the sync bytes
1222      out.flush();                           // flush header
1223    }
1224
1225    /** Initialize. */
1226    @SuppressWarnings("unchecked")
1227    void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1228              Class keyClass, Class valClass,
1229              CompressionCodec codec, Metadata metadata) 
1230      throws IOException {
1231      this.conf = conf;
1232      this.out = out;
1233      this.ownOutputStream = ownStream;
1234      this.keyClass = keyClass;
1235      this.valClass = valClass;
1236      this.codec = codec;
1237      this.metadata = metadata;
1238      SerializationFactory serializationFactory = new SerializationFactory(conf);
1239      this.keySerializer = serializationFactory.getSerializer(keyClass);
1240      if (this.keySerializer == null) {
1241        throw new IOException(
1242            "Could not find a serializer for the Key class: '"
1243                + keyClass.getCanonicalName() + "'. "
1244                + "Please ensure that the configuration '" +
1245                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1246                + "properly configured, if you're using"
1247                + "custom serialization.");
1248      }
1249      this.keySerializer.open(buffer);
1250      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1251      if (this.uncompressedValSerializer == null) {
1252        throw new IOException(
1253            "Could not find a serializer for the Value class: '"
1254                + valClass.getCanonicalName() + "'. "
1255                + "Please ensure that the configuration '" +
1256                CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1257                + "properly configured, if you're using"
1258                + "custom serialization.");
1259      }
1260      this.uncompressedValSerializer.open(buffer);
1261      if (this.codec != null) {
1262        ReflectionUtils.setConf(this.codec, this.conf);
1263        this.compressor = CodecPool.getCompressor(this.codec);
1264        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1265        this.deflateOut = 
1266          new DataOutputStream(new BufferedOutputStream(deflateFilter));
1267        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1268        if (this.compressedValSerializer == null) {
1269          throw new IOException(
1270              "Could not find a serializer for the Value class: '"
1271                  + valClass.getCanonicalName() + "'. "
1272                  + "Please ensure that the configuration '" +
1273                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1274                  + "properly configured, if you're using"
1275                  + "custom serialization.");
1276        }
1277        this.compressedValSerializer.open(deflateOut);
1278      }
1279
1280      if (appendMode) {
1281        sync();
1282      } else {
1283        writeFileHeader();
1284      }
1285    }
1286    
1287    /** Returns the class of keys in this file. */
1288    public Class getKeyClass() { return keyClass; }
1289
1290    /** Returns the class of values in this file. */
1291    public Class getValueClass() { return valClass; }
1292
1293    /** Returns the compression codec of data in this file. */
1294    public CompressionCodec getCompressionCodec() { return codec; }
1295    
1296    /** create a sync point */
1297    public void sync() throws IOException {
1298      if (sync != null && lastSyncPos != out.getPos()) {
1299        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1300        out.write(sync);                          // write sync
1301        lastSyncPos = out.getPos();               // update lastSyncPos
1302      }
1303    }
1304
1305    /**
1306     * flush all currently written data to the file system
1307     * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1308     */
1309    @Deprecated
1310    public void syncFs() throws IOException {
1311      if (out != null) {
1312        out.sync();                               // flush contents to file system
1313      }
1314    }
1315
1316    @Override
1317    public void hsync() throws IOException {
1318      if (out != null) {
1319        out.hsync();
1320      }
1321    }
1322
1323    @Override
1324    public void hflush() throws IOException {
1325      if (out != null) {
1326        out.hflush();
1327      }
1328    }
1329    
1330    /** Returns the configuration of this file. */
1331    Configuration getConf() { return conf; }
1332    
1333    /** Close the file. */
1334    @Override
1335    public synchronized void close() throws IOException {
1336      keySerializer.close();
1337      uncompressedValSerializer.close();
1338      if (compressedValSerializer != null) {
1339        compressedValSerializer.close();
1340      }
1341
1342      CodecPool.returnCompressor(compressor);
1343      compressor = null;
1344      
1345      if (out != null) {
1346        
1347        // Close the underlying stream iff we own it...
1348        if (ownOutputStream) {
1349          out.close();
1350        } else {
1351          out.flush();
1352        }
1353        out = null;
1354      }
1355    }
1356
1357    synchronized void checkAndWriteSync() throws IOException {
1358      if (sync != null &&
1359          out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1360        sync();
1361      }
1362    }
1363
1364    /** Append a key/value pair. */
1365    public void append(Writable key, Writable val)
1366      throws IOException {
1367      append((Object) key, (Object) val);
1368    }
1369
1370    /** Append a key/value pair. */
1371    @SuppressWarnings("unchecked")
1372    public synchronized void append(Object key, Object val)
1373      throws IOException {
1374      if (key.getClass() != keyClass)
1375        throw new IOException("wrong key class: "+key.getClass().getName()
1376                              +" is not "+keyClass);
1377      if (val.getClass() != valClass)
1378        throw new IOException("wrong value class: "+val.getClass().getName()
1379                              +" is not "+valClass);
1380
1381      buffer.reset();
1382
1383      // Append the 'key'
1384      keySerializer.serialize(key);
1385      int keyLength = buffer.getLength();
1386      if (keyLength < 0)
1387        throw new IOException("negative length keys not allowed: " + key);
1388
1389      // Append the 'value'
1390      if (compress == CompressionType.RECORD) {
1391        deflateFilter.resetState();
1392        compressedValSerializer.serialize(val);
1393        deflateOut.flush();
1394        deflateFilter.finish();
1395      } else {
1396        uncompressedValSerializer.serialize(val);
1397      }
1398
1399      // Write the record out
1400      checkAndWriteSync();                                // sync
1401      out.writeInt(buffer.getLength());                   // total record length
1402      out.writeInt(keyLength);                            // key portion length
1403      out.write(buffer.getData(), 0, buffer.getLength()); // data
1404    }
1405
1406    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1407        int keyLength, ValueBytes val) throws IOException {
1408      if (keyLength < 0)
1409        throw new IOException("negative length keys not allowed: " + keyLength);
1410
1411      int valLength = val.getSize();
1412
1413      checkAndWriteSync();
1414      
1415      out.writeInt(keyLength+valLength);          // total record length
1416      out.writeInt(keyLength);                    // key portion length
1417      out.write(keyData, keyOffset, keyLength);   // key
1418      val.writeUncompressedBytes(out);            // value
1419    }
1420
1421    /** Returns the current length of the output file.
1422     *
1423     * <p>This always returns a synchronized position.  In other words,
1424     * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1425     * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1426     * the key may be earlier in the file than key last written when this
1427     * method was called (e.g., with block-compression, it may be the first key
1428     * in the block that was being written when this method was called).
1429     */
1430    public synchronized long getLength() throws IOException {
1431      return out.getPos();
1432    }
1433
1434  } // class Writer
1435
1436  /** Write key/compressed-value pairs to a sequence-format file. */
1437  static class RecordCompressWriter extends Writer {
1438    
1439    RecordCompressWriter(Configuration conf, 
1440                         Option... options) throws IOException {
1441      super(conf, options);
1442    }
1443
1444    /** Append a key/value pair. */
1445    @Override
1446    @SuppressWarnings("unchecked")
1447    public synchronized void append(Object key, Object val)
1448      throws IOException {
1449      if (key.getClass() != keyClass)
1450        throw new IOException("wrong key class: "+key.getClass().getName()
1451                              +" is not "+keyClass);
1452      if (val.getClass() != valClass)
1453        throw new IOException("wrong value class: "+val.getClass().getName()
1454                              +" is not "+valClass);
1455
1456      buffer.reset();
1457
1458      // Append the 'key'
1459      keySerializer.serialize(key);
1460      int keyLength = buffer.getLength();
1461      if (keyLength < 0)
1462        throw new IOException("negative length keys not allowed: " + key);
1463
1464      // Compress 'value' and append it
1465      deflateFilter.resetState();
1466      compressedValSerializer.serialize(val);
1467      deflateOut.flush();
1468      deflateFilter.finish();
1469
1470      // Write the record out
1471      checkAndWriteSync();                                // sync
1472      out.writeInt(buffer.getLength());                   // total record length
1473      out.writeInt(keyLength);                            // key portion length
1474      out.write(buffer.getData(), 0, buffer.getLength()); // data
1475    }
1476
1477    /** Append a key/value pair. */
1478    @Override
1479    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1480        int keyLength, ValueBytes val) throws IOException {
1481
1482      if (keyLength < 0)
1483        throw new IOException("negative length keys not allowed: " + keyLength);
1484
1485      int valLength = val.getSize();
1486      
1487      checkAndWriteSync();                        // sync
1488      out.writeInt(keyLength+valLength);          // total record length
1489      out.writeInt(keyLength);                    // key portion length
1490      out.write(keyData, keyOffset, keyLength);   // 'key' data
1491      val.writeCompressedBytes(out);              // 'value' data
1492    }
1493    
1494  } // RecordCompressionWriter
1495
1496  /** Write compressed key/value blocks to a sequence-format file. */
1497  static class BlockCompressWriter extends Writer {
1498    
1499    private int noBufferedRecords = 0;
1500    
1501    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1502    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1503
1504    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1505    private DataOutputBuffer valBuffer = new DataOutputBuffer();
1506
1507    private final int compressionBlockSize;
1508    
1509    BlockCompressWriter(Configuration conf,
1510                        Option... options) throws IOException {
1511      super(conf, options);
1512      compressionBlockSize = 
1513        conf.getInt("io.seqfile.compress.blocksize", 1000000);
1514      keySerializer.close();
1515      keySerializer.open(keyBuffer);
1516      uncompressedValSerializer.close();
1517      uncompressedValSerializer.open(valBuffer);
1518    }
1519
1520    /** Workhorse to check and write out compressed data/lengths */
1521    private synchronized 
1522      void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1523      throws IOException {
1524      deflateFilter.resetState();
1525      buffer.reset();
1526      deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1527                       uncompressedDataBuffer.getLength());
1528      deflateOut.flush();
1529      deflateFilter.finish();
1530      
1531      WritableUtils.writeVInt(out, buffer.getLength());
1532      out.write(buffer.getData(), 0, buffer.getLength());
1533    }
1534    
1535    /** Compress and flush contents to dfs */
1536    @Override
1537    public synchronized void sync() throws IOException {
1538      if (noBufferedRecords > 0) {
1539        super.sync();
1540        
1541        // No. of records
1542        WritableUtils.writeVInt(out, noBufferedRecords);
1543        
1544        // Write 'keys' and lengths
1545        writeBuffer(keyLenBuffer);
1546        writeBuffer(keyBuffer);
1547        
1548        // Write 'values' and lengths
1549        writeBuffer(valLenBuffer);
1550        writeBuffer(valBuffer);
1551        
1552        // Flush the file-stream
1553        out.flush();
1554        
1555        // Reset internal states
1556        keyLenBuffer.reset();
1557        keyBuffer.reset();
1558        valLenBuffer.reset();
1559        valBuffer.reset();
1560        noBufferedRecords = 0;
1561      }
1562      
1563    }
1564    
1565    /** Close the file. */
1566    @Override
1567    public synchronized void close() throws IOException {
1568      if (out != null) {
1569        sync();
1570      }
1571      super.close();
1572    }
1573
1574    /** Append a key/value pair. */
1575    @Override
1576    @SuppressWarnings("unchecked")
1577    public synchronized void append(Object key, Object val)
1578      throws IOException {
1579      if (key.getClass() != keyClass)
1580        throw new IOException("wrong key class: "+key+" is not "+keyClass);
1581      if (val.getClass() != valClass)
1582        throw new IOException("wrong value class: "+val+" is not "+valClass);
1583
1584      // Save key/value into respective buffers 
1585      int oldKeyLength = keyBuffer.getLength();
1586      keySerializer.serialize(key);
1587      int keyLength = keyBuffer.getLength() - oldKeyLength;
1588      if (keyLength < 0)
1589        throw new IOException("negative length keys not allowed: " + key);
1590      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1591
1592      int oldValLength = valBuffer.getLength();
1593      uncompressedValSerializer.serialize(val);
1594      int valLength = valBuffer.getLength() - oldValLength;
1595      WritableUtils.writeVInt(valLenBuffer, valLength);
1596      
1597      // Added another key/value pair
1598      ++noBufferedRecords;
1599      
1600      // Compress and flush?
1601      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1602      if (currentBlockSize >= compressionBlockSize) {
1603        sync();
1604      }
1605    }
1606    
1607    /** Append a key/value pair. */
1608    @Override
1609    public synchronized void appendRaw(byte[] keyData, int keyOffset,
1610        int keyLength, ValueBytes val) throws IOException {
1611      
1612      if (keyLength < 0)
1613        throw new IOException("negative length keys not allowed");
1614
1615      int valLength = val.getSize();
1616      
1617      // Save key/value data in relevant buffers
1618      WritableUtils.writeVInt(keyLenBuffer, keyLength);
1619      keyBuffer.write(keyData, keyOffset, keyLength);
1620      WritableUtils.writeVInt(valLenBuffer, valLength);
1621      val.writeUncompressedBytes(valBuffer);
1622
1623      // Added another key/value pair
1624      ++noBufferedRecords;
1625
1626      // Compress and flush?
1627      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1628      if (currentBlockSize >= compressionBlockSize) {
1629        sync();
1630      }
1631    }
1632  
1633  } // BlockCompressionWriter
1634
1635  /** Get the configured buffer size */
1636  private static int getBufferSize(Configuration conf) {
1637    return conf.getInt("io.file.buffer.size", 4096);
1638  }
1639
1640  /** Reads key/value pairs from a sequence-format file. */
1641  public static class Reader implements java.io.Closeable {
1642    private String filename;
1643    private FSDataInputStream in;
1644    private DataOutputBuffer outBuf = new DataOutputBuffer();
1645
1646    private byte version;
1647
1648    private String keyClassName;
1649    private String valClassName;
1650    private Class keyClass;
1651    private Class valClass;
1652
1653    private CompressionCodec codec = null;
1654    private Metadata metadata = null;
1655    
1656    private byte[] sync = new byte[SYNC_HASH_SIZE];
1657    private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1658    private boolean syncSeen;
1659
1660    private long headerEnd;
1661    private long end;
1662    private int keyLength;
1663    private int recordLength;
1664
1665    private boolean decompress;
1666    private boolean blockCompressed;
1667    
1668    private Configuration conf;
1669
1670    private int noBufferedRecords = 0;
1671    private boolean lazyDecompress = true;
1672    private boolean valuesDecompressed = true;
1673    
1674    private int noBufferedKeys = 0;
1675    private int noBufferedValues = 0;
1676    
1677    private DataInputBuffer keyLenBuffer = null;
1678    private CompressionInputStream keyLenInFilter = null;
1679    private DataInputStream keyLenIn = null;
1680    private Decompressor keyLenDecompressor = null;
1681    private DataInputBuffer keyBuffer = null;
1682    private CompressionInputStream keyInFilter = null;
1683    private DataInputStream keyIn = null;
1684    private Decompressor keyDecompressor = null;
1685
1686    private DataInputBuffer valLenBuffer = null;
1687    private CompressionInputStream valLenInFilter = null;
1688    private DataInputStream valLenIn = null;
1689    private Decompressor valLenDecompressor = null;
1690    private DataInputBuffer valBuffer = null;
1691    private CompressionInputStream valInFilter = null;
1692    private DataInputStream valIn = null;
1693    private Decompressor valDecompressor = null;
1694    
1695    private Deserializer keyDeserializer;
1696    private Deserializer valDeserializer;
1697
1698    /**
1699     * A tag interface for all of the Reader options
1700     */
1701    public static interface Option {}
1702    
1703    /**
1704     * Create an option to specify the path name of the sequence file.
1705     * @param value the path to read
1706     * @return a new option
1707     */
1708    public static Option file(Path value) {
1709      return new FileOption(value);
1710    }
1711    
1712    /**
1713     * Create an option to specify the stream with the sequence file.
1714     * @param value the stream to read.
1715     * @return a new option
1716     */
1717    public static Option stream(FSDataInputStream value) {
1718      return new InputStreamOption(value);
1719    }
1720    
1721    /**
1722     * Create an option to specify the starting byte to read.
1723     * @param value the number of bytes to skip over
1724     * @return a new option
1725     */
1726    public static Option start(long value) {
1727      return new StartOption(value);
1728    }
1729    
1730    /**
1731     * Create an option to specify the number of bytes to read.
1732     * @param value the number of bytes to read
1733     * @return a new option
1734     */
1735    public static Option length(long value) {
1736      return new LengthOption(value);
1737    }
1738    
1739    /**
1740     * Create an option with the buffer size for reading the given pathname.
1741     * @param value the number of bytes to buffer
1742     * @return a new option
1743     */
1744    public static Option bufferSize(int value) {
1745      return new BufferSizeOption(value);
1746    }
1747
1748    private static class FileOption extends Options.PathOption 
1749                                    implements Option {
1750      private FileOption(Path value) {
1751        super(value);
1752      }
1753    }
1754    
1755    private static class InputStreamOption
1756        extends Options.FSDataInputStreamOption 
1757        implements Option {
1758      private InputStreamOption(FSDataInputStream value) {
1759        super(value);
1760      }
1761    }
1762
1763    private static class StartOption extends Options.LongOption
1764                                     implements Option {
1765      private StartOption(long value) {
1766        super(value);
1767      }
1768    }
1769
1770    private static class LengthOption extends Options.LongOption
1771                                      implements Option {
1772      private LengthOption(long value) {
1773        super(value);
1774      }
1775    }
1776
1777    private static class BufferSizeOption extends Options.IntegerOption
1778                                      implements Option {
1779      private BufferSizeOption(int value) {
1780        super(value);
1781      }
1782    }
1783
1784    // only used directly
1785    private static class OnlyHeaderOption extends Options.BooleanOption 
1786                                          implements Option {
1787      private OnlyHeaderOption() {
1788        super(true);
1789      }
1790    }
1791
1792    public Reader(Configuration conf, Option... opts) throws IOException {
1793      // Look up the options, these are null if not set
1794      FileOption fileOpt = Options.getOption(FileOption.class, opts);
1795      InputStreamOption streamOpt = 
1796        Options.getOption(InputStreamOption.class, opts);
1797      StartOption startOpt = Options.getOption(StartOption.class, opts);
1798      LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1799      BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1800      OnlyHeaderOption headerOnly = 
1801        Options.getOption(OnlyHeaderOption.class, opts);
1802      // check for consistency
1803      if ((fileOpt == null) == (streamOpt == null)) {
1804        throw new 
1805          IllegalArgumentException("File or stream option must be specified");
1806      }
1807      if (fileOpt == null && bufOpt != null) {
1808        throw new IllegalArgumentException("buffer size can only be set when" +
1809                                           " a file is specified.");
1810      }
1811      // figure out the real values
1812      Path filename = null;
1813      FSDataInputStream file;
1814      final long len;
1815      if (fileOpt != null) {
1816        filename = fileOpt.getValue();
1817        FileSystem fs = filename.getFileSystem(conf);
1818        int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1819        len = null == lenOpt
1820          ? fs.getFileStatus(filename).getLen()
1821          : lenOpt.getValue();
1822        file = openFile(fs, filename, bufSize, len);
1823      } else {
1824        len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1825        file = streamOpt.getValue();
1826      }
1827      long start = startOpt == null ? 0 : startOpt.getValue();
1828      // really set up
1829      initialize(filename, file, start, len, conf, headerOnly != null);
1830    }
1831
1832    /**
1833     * Construct a reader by opening a file from the given file system.
1834     * @param fs The file system used to open the file.
1835     * @param file The file being read.
1836     * @param conf Configuration
1837     * @throws IOException
1838     * @deprecated Use Reader(Configuration, Option...) instead.
1839     */
1840    @Deprecated
1841    public Reader(FileSystem fs, Path file, 
1842                  Configuration conf) throws IOException {
1843      this(conf, file(file.makeQualified(fs)));
1844    }
1845
1846    /**
1847     * Construct a reader by the given input stream.
1848     * @param in An input stream.
1849     * @param buffersize unused
1850     * @param start The starting position.
1851     * @param length The length being read.
1852     * @param conf Configuration
1853     * @throws IOException
1854     * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1855     */
1856    @Deprecated
1857    public Reader(FSDataInputStream in, int buffersize,
1858        long start, long length, Configuration conf) throws IOException {
1859      this(conf, stream(in), start(start), length(length));
1860    }
1861
1862    /** Common work of the constructors. */
1863    private void initialize(Path filename, FSDataInputStream in,
1864                            long start, long length, Configuration conf,
1865                            boolean tempReader) throws IOException {
1866      if (in == null) {
1867        throw new IllegalArgumentException("in == null");
1868      }
1869      this.filename = filename == null ? "<unknown>" : filename.toString();
1870      this.in = in;
1871      this.conf = conf;
1872      boolean succeeded = false;
1873      try {
1874        seek(start);
1875        this.end = this.in.getPos() + length;
1876        // if it wrapped around, use the max
1877        if (end < length) {
1878          end = Long.MAX_VALUE;
1879        }
1880        init(tempReader);
1881        succeeded = true;
1882      } finally {
1883        if (!succeeded) {
1884          IOUtils.cleanup(LOG, this.in);
1885        }
1886      }
1887    }
1888
1889    /**
1890     * Override this method to specialize the type of
1891     * {@link FSDataInputStream} returned.
1892     * @param fs The file system used to open the file.
1893     * @param file The file being read.
1894     * @param bufferSize The buffer size used to read the file.
1895     * @param length The length being read if it is >= 0.  Otherwise,
1896     *               the length is not available.
1897     * @return The opened stream.
1898     * @throws IOException
1899     */
1900    protected FSDataInputStream openFile(FileSystem fs, Path file,
1901        int bufferSize, long length) throws IOException {
1902      return fs.open(file, bufferSize);
1903    }
1904    
1905    /**
1906     * Initialize the {@link Reader}
1907     * @param tmpReader <code>true</code> if we are constructing a temporary
1908     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1909     *                  and hence do not initialize every component; 
1910     *                  <code>false</code> otherwise.
1911     * @throws IOException
1912     */
1913    private void init(boolean tempReader) throws IOException {
1914      byte[] versionBlock = new byte[VERSION.length];
1915      in.readFully(versionBlock);
1916
1917      if ((versionBlock[0] != VERSION[0]) ||
1918          (versionBlock[1] != VERSION[1]) ||
1919          (versionBlock[2] != VERSION[2]))
1920        throw new IOException(this + " not a SequenceFile");
1921
1922      // Set 'version'
1923      version = versionBlock[3];
1924      if (version > VERSION[3])
1925        throw new VersionMismatchException(VERSION[3], version);
1926
1927      if (version < BLOCK_COMPRESS_VERSION) {
1928        UTF8 className = new UTF8();
1929
1930        className.readFields(in);
1931        keyClassName = className.toStringChecked(); // key class name
1932
1933        className.readFields(in);
1934        valClassName = className.toStringChecked(); // val class name
1935      } else {
1936        keyClassName = Text.readString(in);
1937        valClassName = Text.readString(in);
1938      }
1939
1940      if (version > 2) {                          // if version > 2
1941        this.decompress = in.readBoolean();       // is compressed?
1942      } else {
1943        decompress = false;
1944      }
1945
1946      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1947        this.blockCompressed = in.readBoolean();  // is block-compressed?
1948      } else {
1949        blockCompressed = false;
1950      }
1951      
1952      // if version >= 5
1953      // setup the compression codec
1954      if (decompress) {
1955        if (version >= CUSTOM_COMPRESS_VERSION) {
1956          String codecClassname = Text.readString(in);
1957          try {
1958            Class<? extends CompressionCodec> codecClass
1959              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1960            this.codec = ReflectionUtils.newInstance(codecClass, conf);
1961          } catch (ClassNotFoundException cnfe) {
1962            throw new IllegalArgumentException("Unknown codec: " + 
1963                                               codecClassname, cnfe);
1964          }
1965        } else {
1966          codec = new DefaultCodec();
1967          ((Configurable)codec).setConf(conf);
1968        }
1969      }
1970      
1971      this.metadata = new Metadata();
1972      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1973        this.metadata.readFields(in);
1974      }
1975      
1976      if (version > 1) {                          // if version > 1
1977        in.readFully(sync);                       // read sync bytes
1978        headerEnd = in.getPos();                  // record end of header
1979      }
1980      
1981      // Initialize... *not* if this we are constructing a temporary Reader
1982      if (!tempReader) {
1983        valBuffer = new DataInputBuffer();
1984        if (decompress) {
1985          valDecompressor = CodecPool.getDecompressor(codec);
1986          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1987          valIn = new DataInputStream(valInFilter);
1988        } else {
1989          valIn = valBuffer;
1990        }
1991
1992        if (blockCompressed) {
1993          keyLenBuffer = new DataInputBuffer();
1994          keyBuffer = new DataInputBuffer();
1995          valLenBuffer = new DataInputBuffer();
1996
1997          keyLenDecompressor = CodecPool.getDecompressor(codec);
1998          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1999                                                   keyLenDecompressor);
2000          keyLenIn = new DataInputStream(keyLenInFilter);
2001
2002          keyDecompressor = CodecPool.getDecompressor(codec);
2003          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
2004          keyIn = new DataInputStream(keyInFilter);
2005
2006          valLenDecompressor = CodecPool.getDecompressor(codec);
2007          valLenInFilter = codec.createInputStream(valLenBuffer, 
2008                                                   valLenDecompressor);
2009          valLenIn = new DataInputStream(valLenInFilter);
2010        }
2011        
2012        SerializationFactory serializationFactory =
2013          new SerializationFactory(conf);
2014        this.keyDeserializer =
2015          getDeserializer(serializationFactory, getKeyClass());
2016        if (this.keyDeserializer == null) {
2017          throw new IOException(
2018              "Could not find a deserializer for the Key class: '"
2019                  + getKeyClass().getCanonicalName() + "'. "
2020                  + "Please ensure that the configuration '" +
2021                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2022                  + "properly configured, if you're using "
2023                  + "custom serialization.");
2024        }
2025        if (!blockCompressed) {
2026          this.keyDeserializer.open(valBuffer);
2027        } else {
2028          this.keyDeserializer.open(keyIn);
2029        }
2030        this.valDeserializer =
2031          getDeserializer(serializationFactory, getValueClass());
2032        if (this.valDeserializer == null) {
2033          throw new IOException(
2034              "Could not find a deserializer for the Value class: '"
2035                  + getValueClass().getCanonicalName() + "'. "
2036                  + "Please ensure that the configuration '" +
2037                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
2038                  + "properly configured, if you're using "
2039                  + "custom serialization.");
2040        }
2041        this.valDeserializer.open(valIn);
2042      }
2043    }
2044    
2045    @SuppressWarnings("unchecked")
2046    private Deserializer getDeserializer(SerializationFactory sf, Class c) {
2047      return sf.getDeserializer(c);
2048    }
2049    
2050    /** Close the file. */
2051    @Override
2052    public synchronized void close() throws IOException {
2053      // Return the decompressors to the pool
2054      CodecPool.returnDecompressor(keyLenDecompressor);
2055      CodecPool.returnDecompressor(keyDecompressor);
2056      CodecPool.returnDecompressor(valLenDecompressor);
2057      CodecPool.returnDecompressor(valDecompressor);
2058      keyLenDecompressor = keyDecompressor = null;
2059      valLenDecompressor = valDecompressor = null;
2060      
2061      if (keyDeserializer != null) {
2062        keyDeserializer.close();
2063      }
2064      if (valDeserializer != null) {
2065        valDeserializer.close();
2066      }
2067      
2068      // Close the input-stream
2069      in.close();
2070    }
2071
2072    /** Returns the name of the key class. */
2073    public String getKeyClassName() {
2074      return keyClassName;
2075    }
2076
2077    /** Returns the class of keys in this file. */
2078    public synchronized Class<?> getKeyClass() {
2079      if (null == keyClass) {
2080        try {
2081          keyClass = WritableName.getClass(getKeyClassName(), conf);
2082        } catch (IOException e) {
2083          throw new RuntimeException(e);
2084        }
2085      }
2086      return keyClass;
2087    }
2088
2089    /** Returns the name of the value class. */
2090    public String getValueClassName() {
2091      return valClassName;
2092    }
2093
2094    /** Returns the class of values in this file. */
2095    public synchronized Class<?> getValueClass() {
2096      if (null == valClass) {
2097        try {
2098          valClass = WritableName.getClass(getValueClassName(), conf);
2099        } catch (IOException e) {
2100          throw new RuntimeException(e);
2101        }
2102      }
2103      return valClass;
2104    }
2105
2106    /** Returns true if values are compressed. */
2107    public boolean isCompressed() { return decompress; }
2108    
2109    /** Returns true if records are block-compressed. */
2110    public boolean isBlockCompressed() { return blockCompressed; }
2111    
2112    /** Returns the compression codec of data in this file. */
2113    public CompressionCodec getCompressionCodec() { return codec; }
2114    
2115    private byte[] getSync() {
2116      return sync;
2117    }
2118
2119    private byte getVersion() {
2120      return version;
2121    }
2122
2123    /**
2124     * Get the compression type for this file.
2125     * @return the compression type
2126     */
2127    public CompressionType getCompressionType() {
2128      if (decompress) {
2129        return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2130      } else {
2131        return CompressionType.NONE;
2132      }
2133    }
2134
2135    /** Returns the metadata object of the file */
2136    public Metadata getMetadata() {
2137      return this.metadata;
2138    }
2139    
2140    /** Returns the configuration used for this file. */
2141    Configuration getConf() { return conf; }
2142    
2143    /** Read a compressed buffer */
2144    private synchronized void readBuffer(DataInputBuffer buffer, 
2145                                         CompressionInputStream filter) throws IOException {
2146      // Read data into a temporary buffer
2147      DataOutputBuffer dataBuffer = new DataOutputBuffer();
2148
2149      try {
2150        int dataBufferLength = WritableUtils.readVInt(in);
2151        dataBuffer.write(in, dataBufferLength);
2152      
2153        // Set up 'buffer' connected to the input-stream
2154        buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2155      } finally {
2156        dataBuffer.close();
2157      }
2158
2159      // Reset the codec
2160      filter.resetState();
2161    }
2162    
2163    /** Read the next 'compressed' block */
2164    private synchronized void readBlock() throws IOException {
2165      // Check if we need to throw away a whole block of 
2166      // 'values' due to 'lazy decompression' 
2167      if (lazyDecompress && !valuesDecompressed) {
2168        in.seek(WritableUtils.readVInt(in)+in.getPos());
2169        in.seek(WritableUtils.readVInt(in)+in.getPos());
2170      }
2171      
2172      // Reset internal states
2173      noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2174      valuesDecompressed = false;
2175
2176      //Process sync
2177      if (sync != null) {
2178        in.readInt();
2179        in.readFully(syncCheck);                // read syncCheck
2180        if (!Arrays.equals(sync, syncCheck))    // check it
2181          throw new IOException("File is corrupt!");
2182      }
2183      syncSeen = true;
2184
2185      // Read number of records in this block
2186      noBufferedRecords = WritableUtils.readVInt(in);
2187      
2188      // Read key lengths and keys
2189      readBuffer(keyLenBuffer, keyLenInFilter);
2190      readBuffer(keyBuffer, keyInFilter);
2191      noBufferedKeys = noBufferedRecords;
2192      
2193      // Read value lengths and values
2194      if (!lazyDecompress) {
2195        readBuffer(valLenBuffer, valLenInFilter);
2196        readBuffer(valBuffer, valInFilter);
2197        noBufferedValues = noBufferedRecords;
2198        valuesDecompressed = true;
2199      }
2200    }
2201
2202    /** 
2203     * Position valLenIn/valIn to the 'value' 
2204     * corresponding to the 'current' key 
2205     */
2206    private synchronized void seekToCurrentValue() throws IOException {
2207      if (!blockCompressed) {
2208        if (decompress) {
2209          valInFilter.resetState();
2210        }
2211        valBuffer.reset();
2212      } else {
2213        // Check if this is the first value in the 'block' to be read
2214        if (lazyDecompress && !valuesDecompressed) {
2215          // Read the value lengths and values
2216          readBuffer(valLenBuffer, valLenInFilter);
2217          readBuffer(valBuffer, valInFilter);
2218          noBufferedValues = noBufferedRecords;
2219          valuesDecompressed = true;
2220        }
2221        
2222        // Calculate the no. of bytes to skip
2223        // Note: 'current' key has already been read!
2224        int skipValBytes = 0;
2225        int currentKey = noBufferedKeys + 1;          
2226        for (int i=noBufferedValues; i > currentKey; --i) {
2227          skipValBytes += WritableUtils.readVInt(valLenIn);
2228          --noBufferedValues;
2229        }
2230        
2231        // Skip to the 'val' corresponding to 'current' key
2232        if (skipValBytes > 0) {
2233          if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2234            throw new IOException("Failed to seek to " + currentKey + 
2235                                  "(th) value!");
2236          }
2237        }
2238      }
2239    }
2240
2241    /**
2242     * Get the 'value' corresponding to the last read 'key'.
2243     * @param val : The 'value' to be read.
2244     * @throws IOException
2245     */
2246    public synchronized void getCurrentValue(Writable val) 
2247      throws IOException {
2248      if (val instanceof Configurable) {
2249        ((Configurable) val).setConf(this.conf);
2250      }
2251
2252      // Position stream to 'current' value
2253      seekToCurrentValue();
2254
2255      if (!blockCompressed) {
2256        val.readFields(valIn);
2257        
2258        if (valIn.read() > 0) {
2259          LOG.info("available bytes: " + valIn.available());
2260          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2261                                + " bytes, should read " +
2262                                (valBuffer.getLength()-keyLength));
2263        }
2264      } else {
2265        // Get the value
2266        int valLength = WritableUtils.readVInt(valLenIn);
2267        val.readFields(valIn);
2268        
2269        // Read another compressed 'value'
2270        --noBufferedValues;
2271        
2272        // Sanity check
2273        if ((valLength < 0) && LOG.isDebugEnabled()) {
2274          LOG.debug(val + " is a zero-length value");
2275        }
2276      }
2277
2278    }
2279    
2280    /**
2281     * Get the 'value' corresponding to the last read 'key'.
2282     * @param val : The 'value' to be read.
2283     * @throws IOException
2284     */
2285    public synchronized Object getCurrentValue(Object val) 
2286      throws IOException {
2287      if (val instanceof Configurable) {
2288        ((Configurable) val).setConf(this.conf);
2289      }
2290
2291      // Position stream to 'current' value
2292      seekToCurrentValue();
2293
2294      if (!blockCompressed) {
2295        val = deserializeValue(val);
2296        
2297        if (valIn.read() > 0) {
2298          LOG.info("available bytes: " + valIn.available());
2299          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2300                                + " bytes, should read " +
2301                                (valBuffer.getLength()-keyLength));
2302        }
2303      } else {
2304        // Get the value
2305        int valLength = WritableUtils.readVInt(valLenIn);
2306        val = deserializeValue(val);
2307        
2308        // Read another compressed 'value'
2309        --noBufferedValues;
2310        
2311        // Sanity check
2312        if ((valLength < 0) && LOG.isDebugEnabled()) {
2313          LOG.debug(val + " is a zero-length value");
2314        }
2315      }
2316      return val;
2317
2318    }
2319
2320    @SuppressWarnings("unchecked")
2321    private Object deserializeValue(Object val) throws IOException {
2322      return valDeserializer.deserialize(val);
2323    }
2324    
2325    /** Read the next key in the file into <code>key</code>, skipping its
2326     * value.  True if another entry exists, and false at end of file. */
2327    public synchronized boolean next(Writable key) throws IOException {
2328      if (key.getClass() != getKeyClass())
2329        throw new IOException("wrong key class: "+key.getClass().getName()
2330                              +" is not "+keyClass);
2331
2332      if (!blockCompressed) {
2333        outBuf.reset();
2334        
2335        keyLength = next(outBuf);
2336        if (keyLength < 0)
2337          return false;
2338        
2339        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2340        
2341        key.readFields(valBuffer);
2342        valBuffer.mark(0);
2343        if (valBuffer.getPosition() != keyLength)
2344          throw new IOException(key + " read " + valBuffer.getPosition()
2345                                + " bytes, should read " + keyLength);
2346      } else {
2347        //Reset syncSeen
2348        syncSeen = false;
2349        
2350        if (noBufferedKeys == 0) {
2351          try {
2352            readBlock();
2353          } catch (EOFException eof) {
2354            return false;
2355          }
2356        }
2357        
2358        int keyLength = WritableUtils.readVInt(keyLenIn);
2359        
2360        // Sanity check
2361        if (keyLength < 0) {
2362          return false;
2363        }
2364        
2365        //Read another compressed 'key'
2366        key.readFields(keyIn);
2367        --noBufferedKeys;
2368      }
2369
2370      return true;
2371    }
2372
2373    /** Read the next key/value pair in the file into <code>key</code> and
2374     * <code>val</code>.  Returns true if such a pair exists and false when at
2375     * end of file */
2376    public synchronized boolean next(Writable key, Writable val)
2377      throws IOException {
2378      if (val.getClass() != getValueClass())
2379        throw new IOException("wrong value class: "+val+" is not "+valClass);
2380
2381      boolean more = next(key);
2382      
2383      if (more) {
2384        getCurrentValue(val);
2385      }
2386
2387      return more;
2388    }
2389    
2390    /**
2391     * Read and return the next record length, potentially skipping over 
2392     * a sync block.
2393     * @return the length of the next record or -1 if there is no next record
2394     * @throws IOException
2395     */
2396    private synchronized int readRecordLength() throws IOException {
2397      if (in.getPos() >= end) {
2398        return -1;
2399      }      
2400      int length = in.readInt();
2401      if (version > 1 && sync != null &&
2402          length == SYNC_ESCAPE) {              // process a sync entry
2403        in.readFully(syncCheck);                // read syncCheck
2404        if (!Arrays.equals(sync, syncCheck))    // check it
2405          throw new IOException("File is corrupt!");
2406        syncSeen = true;
2407        if (in.getPos() >= end) {
2408          return -1;
2409        }
2410        length = in.readInt();                  // re-read length
2411      } else {
2412        syncSeen = false;
2413      }
2414      
2415      return length;
2416    }
2417    
2418    /** Read the next key/value pair in the file into <code>buffer</code>.
2419     * Returns the length of the key read, or -1 if at end of file.  The length
2420     * of the value may be computed by calling buffer.getLength() before and
2421     * after calls to this method. */
2422    /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2423    @Deprecated
2424    synchronized int next(DataOutputBuffer buffer) throws IOException {
2425      // Unsupported for block-compressed sequence files
2426      if (blockCompressed) {
2427        throw new IOException("Unsupported call for block-compressed" +
2428                              " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2429      }
2430      try {
2431        int length = readRecordLength();
2432        if (length == -1) {
2433          return -1;
2434        }
2435        int keyLength = in.readInt();
2436        buffer.write(in, length);
2437        return keyLength;
2438      } catch (ChecksumException e) {             // checksum failure
2439        handleChecksumException(e);
2440        return next(buffer);
2441      }
2442    }
2443
2444    public ValueBytes createValueBytes() {
2445      ValueBytes val = null;
2446      if (!decompress || blockCompressed) {
2447        val = new UncompressedBytes();
2448      } else {
2449        val = new CompressedBytes(codec);
2450      }
2451      return val;
2452    }
2453
2454    /**
2455     * Read 'raw' records.
2456     * @param key - The buffer into which the key is read
2457     * @param val - The 'raw' value
2458     * @return Returns the total record length or -1 for end of file
2459     * @throws IOException
2460     */
2461    public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2462      throws IOException {
2463      if (!blockCompressed) {
2464        int length = readRecordLength();
2465        if (length == -1) {
2466          return -1;
2467        }
2468        int keyLength = in.readInt();
2469        int valLength = length - keyLength;
2470        key.write(in, keyLength);
2471        if (decompress) {
2472          CompressedBytes value = (CompressedBytes)val;
2473          value.reset(in, valLength);
2474        } else {
2475          UncompressedBytes value = (UncompressedBytes)val;
2476          value.reset(in, valLength);
2477        }
2478        
2479        return length;
2480      } else {
2481        //Reset syncSeen
2482        syncSeen = false;
2483        
2484        // Read 'key'
2485        if (noBufferedKeys == 0) {
2486          if (in.getPos() >= end) 
2487            return -1;
2488
2489          try { 
2490            readBlock();
2491          } catch (EOFException eof) {
2492            return -1;
2493          }
2494        }
2495        int keyLength = WritableUtils.readVInt(keyLenIn);
2496        if (keyLength < 0) {
2497          throw new IOException("zero length key found!");
2498        }
2499        key.write(keyIn, keyLength);
2500        --noBufferedKeys;
2501        
2502        // Read raw 'value'
2503        seekToCurrentValue();
2504        int valLength = WritableUtils.readVInt(valLenIn);
2505        UncompressedBytes rawValue = (UncompressedBytes)val;
2506        rawValue.reset(valIn, valLength);
2507        --noBufferedValues;
2508        
2509        return (keyLength+valLength);
2510      }
2511      
2512    }
2513
2514    /**
2515     * Read 'raw' keys.
2516     * @param key - The buffer into which the key is read
2517     * @return Returns the key length or -1 for end of file
2518     * @throws IOException
2519     */
2520    public synchronized int nextRawKey(DataOutputBuffer key) 
2521      throws IOException {
2522      if (!blockCompressed) {
2523        recordLength = readRecordLength();
2524        if (recordLength == -1) {
2525          return -1;
2526        }
2527        keyLength = in.readInt();
2528        key.write(in, keyLength);
2529        return keyLength;
2530      } else {
2531        //Reset syncSeen
2532        syncSeen = false;
2533        
2534        // Read 'key'
2535        if (noBufferedKeys == 0) {
2536          if (in.getPos() >= end) 
2537            return -1;
2538
2539          try { 
2540            readBlock();
2541          } catch (EOFException eof) {
2542            return -1;
2543          }
2544        }
2545        int keyLength = WritableUtils.readVInt(keyLenIn);
2546        if (keyLength < 0) {
2547          throw new IOException("zero length key found!");
2548        }
2549        key.write(keyIn, keyLength);
2550        --noBufferedKeys;
2551        
2552        return keyLength;
2553      }
2554      
2555    }
2556
2557    /** Read the next key in the file, skipping its
2558     * value.  Return null at end of file. */
2559    public synchronized Object next(Object key) throws IOException {
2560      if (key != null && key.getClass() != getKeyClass()) {
2561        throw new IOException("wrong key class: "+key.getClass().getName()
2562                              +" is not "+keyClass);
2563      }
2564
2565      if (!blockCompressed) {
2566        outBuf.reset();
2567        
2568        keyLength = next(outBuf);
2569        if (keyLength < 0)
2570          return null;
2571        
2572        valBuffer.reset(outBuf.getData(), outBuf.getLength());
2573        
2574        key = deserializeKey(key);
2575        valBuffer.mark(0);
2576        if (valBuffer.getPosition() != keyLength)
2577          throw new IOException(key + " read " + valBuffer.getPosition()
2578                                + " bytes, should read " + keyLength);
2579      } else {
2580        //Reset syncSeen
2581        syncSeen = false;
2582        
2583        if (noBufferedKeys == 0) {
2584          try {
2585            readBlock();
2586          } catch (EOFException eof) {
2587            return null;
2588          }
2589        }
2590        
2591        int keyLength = WritableUtils.readVInt(keyLenIn);
2592        
2593        // Sanity check
2594        if (keyLength < 0) {
2595          return null;
2596        }
2597        
2598        //Read another compressed 'key'
2599        key = deserializeKey(key);
2600        --noBufferedKeys;
2601      }
2602
2603      return key;
2604    }
2605
2606    @SuppressWarnings("unchecked")
2607    private Object deserializeKey(Object key) throws IOException {
2608      return keyDeserializer.deserialize(key);
2609    }
2610
2611    /**
2612     * Read 'raw' values.
2613     * @param val - The 'raw' value
2614     * @return Returns the value length
2615     * @throws IOException
2616     */
2617    public synchronized int nextRawValue(ValueBytes val) 
2618      throws IOException {
2619      
2620      // Position stream to current value
2621      seekToCurrentValue();
2622 
2623      if (!blockCompressed) {
2624        int valLength = recordLength - keyLength;
2625        if (decompress) {
2626          CompressedBytes value = (CompressedBytes)val;
2627          value.reset(in, valLength);
2628        } else {
2629          UncompressedBytes value = (UncompressedBytes)val;
2630          value.reset(in, valLength);
2631        }
2632         
2633        return valLength;
2634      } else {
2635        int valLength = WritableUtils.readVInt(valLenIn);
2636        UncompressedBytes rawValue = (UncompressedBytes)val;
2637        rawValue.reset(valIn, valLength);
2638        --noBufferedValues;
2639        return valLength;
2640      }
2641      
2642    }
2643
2644    private void handleChecksumException(ChecksumException e)
2645      throws IOException {
2646      if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2647        LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2648        sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2649      } else {
2650        throw e;
2651      }
2652    }
2653
2654    /** disables sync. often invoked for tmp files */
2655    synchronized void ignoreSync() {
2656      sync = null;
2657    }
2658    
2659    /** Set the current byte position in the input file.
2660     *
2661     * <p>The position passed must be a position returned by {@link
2662     * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2663     * position, use {@link SequenceFile.Reader#sync(long)}.
2664     */
2665    public synchronized void seek(long position) throws IOException {
2666      in.seek(position);
2667      if (blockCompressed) {                      // trigger block read
2668        noBufferedKeys = 0;
2669        valuesDecompressed = true;
2670      }
2671    }
2672
2673    /** Seek to the next sync mark past a given position.*/
2674    public synchronized void sync(long position) throws IOException {
2675      if (position+SYNC_SIZE >= end) {
2676        seek(end);
2677        return;
2678      }
2679
2680      if (position < headerEnd) {
2681        // seek directly to first record
2682        in.seek(headerEnd);
2683        // note the sync marker "seen" in the header
2684        syncSeen = true;
2685        return;
2686      }
2687
2688      try {
2689        seek(position+4);                         // skip escape
2690        in.readFully(syncCheck);
2691        int syncLen = sync.length;
2692        for (int i = 0; in.getPos() < end; i++) {
2693          int j = 0;
2694          for (; j < syncLen; j++) {
2695            if (sync[j] != syncCheck[(i+j)%syncLen])
2696              break;
2697          }
2698          if (j == syncLen) {
2699            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2700            return;
2701          }
2702          syncCheck[i%syncLen] = in.readByte();
2703        }
2704      } catch (ChecksumException e) {             // checksum failure
2705        handleChecksumException(e);
2706      }
2707    }
2708
2709    /** Returns true iff the previous call to next passed a sync mark.*/
2710    public synchronized boolean syncSeen() { return syncSeen; }
2711
2712    /** Return the current byte position in the input file. */
2713    public synchronized long getPosition() throws IOException {
2714      return in.getPos();
2715    }
2716
2717    /** Returns the name of the file. */
2718    @Override
2719    public String toString() {
2720      return filename;
2721    }
2722
2723  }
2724
2725  /** Sorts key/value pairs in a sequence-format file.
2726   *
2727   * <p>For best performance, applications should make sure that the {@link
2728   * Writable#readFields(DataInput)} implementation of their keys is
2729   * very efficient.  In particular, it should avoid allocating memory.
2730   */
2731  public static class Sorter {
2732
2733    private RawComparator comparator;
2734
2735    private MergeSort mergeSort; //the implementation of merge sort
2736    
2737    private Path[] inFiles;                     // when merging or sorting
2738
2739    private Path outFile;
2740
2741    private int memory; // bytes
2742    private int factor; // merged per pass
2743
2744    private FileSystem fs = null;
2745
2746    private Class keyClass;
2747    private Class valClass;
2748
2749    private Configuration conf;
2750    private Metadata metadata;
2751    
2752    private Progressable progressable = null;
2753
2754    /** Sort and merge files containing the named classes. */
2755    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2756                  Class valClass, Configuration conf)  {
2757      this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2758    }
2759
2760    /** Sort and merge using an arbitrary {@link RawComparator}. */
2761    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2762                  Class valClass, Configuration conf) {
2763      this(fs, comparator, keyClass, valClass, conf, new Metadata());
2764    }
2765
2766    /** Sort and merge using an arbitrary {@link RawComparator}. */
2767    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2768                  Class valClass, Configuration conf, Metadata metadata) {
2769      this.fs = fs;
2770      this.comparator = comparator;
2771      this.keyClass = keyClass;
2772      this.valClass = valClass;
2773      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2774      this.factor = conf.getInt("io.sort.factor", 100);
2775      this.conf = conf;
2776      this.metadata = metadata;
2777    }
2778
2779    /** Set the number of streams to merge at once.*/
2780    public void setFactor(int factor) { this.factor = factor; }
2781
2782    /** Get the number of streams to merge at once.*/
2783    public int getFactor() { return factor; }
2784
2785    /** Set the total amount of buffer memory, in bytes.*/
2786    public void setMemory(int memory) { this.memory = memory; }
2787
2788    /** Get the total amount of buffer memory, in bytes.*/
2789    public int getMemory() { return memory; }
2790
2791    /** Set the progressable object in order to report progress. */
2792    public void setProgressable(Progressable progressable) {
2793      this.progressable = progressable;
2794    }
2795    
2796    /** 
2797     * Perform a file sort from a set of input files into an output file.
2798     * @param inFiles the files to be sorted
2799     * @param outFile the sorted output file
2800     * @param deleteInput should the input files be deleted as they are read?
2801     */
2802    public void sort(Path[] inFiles, Path outFile,
2803                     boolean deleteInput) throws IOException {
2804      if (fs.exists(outFile)) {
2805        throw new IOException("already exists: " + outFile);
2806      }
2807
2808      this.inFiles = inFiles;
2809      this.outFile = outFile;
2810
2811      int segments = sortPass(deleteInput);
2812      if (segments > 1) {
2813        mergePass(outFile.getParent());
2814      }
2815    }
2816
2817    /** 
2818     * Perform a file sort from a set of input files and return an iterator.
2819     * @param inFiles the files to be sorted
2820     * @param tempDir the directory where temp files are created during sort
2821     * @param deleteInput should the input files be deleted as they are read?
2822     * @return iterator the RawKeyValueIterator
2823     */
2824    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2825                                              boolean deleteInput) throws IOException {
2826      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2827      if (fs.exists(outFile)) {
2828        throw new IOException("already exists: " + outFile);
2829      }
2830      this.inFiles = inFiles;
2831      //outFile will basically be used as prefix for temp files in the cases
2832      //where sort outputs multiple sorted segments. For the single segment
2833      //case, the outputFile itself will contain the sorted data for that
2834      //segment
2835      this.outFile = outFile;
2836
2837      int segments = sortPass(deleteInput);
2838      if (segments > 1)
2839        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2840                     tempDir);
2841      else if (segments == 1)
2842        return merge(new Path[]{outFile}, true, tempDir);
2843      else return null;
2844    }
2845
2846    /**
2847     * The backwards compatible interface to sort.
2848     * @param inFile the input file to sort
2849     * @param outFile the sorted output file
2850     */
2851    public void sort(Path inFile, Path outFile) throws IOException {
2852      sort(new Path[]{inFile}, outFile, false);
2853    }
2854    
2855    private int sortPass(boolean deleteInput) throws IOException {
2856      if(LOG.isDebugEnabled()) {
2857        LOG.debug("running sort pass");
2858      }
2859      SortPass sortPass = new SortPass();         // make the SortPass
2860      sortPass.setProgressable(progressable);
2861      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2862      try {
2863        return sortPass.run(deleteInput);         // run it
2864      } finally {
2865        sortPass.close();                         // close it
2866      }
2867    }
2868
2869    private class SortPass {
2870      private int memoryLimit = memory/4;
2871      private int recordLimit = 1000000;
2872      
2873      private DataOutputBuffer rawKeys = new DataOutputBuffer();
2874      private byte[] rawBuffer;
2875
2876      private int[] keyOffsets = new int[1024];
2877      private int[] pointers = new int[keyOffsets.length];
2878      private int[] pointersCopy = new int[keyOffsets.length];
2879      private int[] keyLengths = new int[keyOffsets.length];
2880      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2881      
2882      private ArrayList segmentLengths = new ArrayList();
2883      
2884      private Reader in = null;
2885      private FSDataOutputStream out = null;
2886      private FSDataOutputStream indexOut = null;
2887      private Path outName;
2888
2889      private Progressable progressable = null;
2890
2891      public int run(boolean deleteInput) throws IOException {
2892        int segments = 0;
2893        int currentFile = 0;
2894        boolean atEof = (currentFile >= inFiles.length);
2895        CompressionType compressionType;
2896        CompressionCodec codec = null;
2897        segmentLengths.clear();
2898        if (atEof) {
2899          return 0;
2900        }
2901        
2902        // Initialize
2903        in = new Reader(fs, inFiles[currentFile], conf);
2904        compressionType = in.getCompressionType();
2905        codec = in.getCompressionCodec();
2906        
2907        for (int i=0; i < rawValues.length; ++i) {
2908          rawValues[i] = null;
2909        }
2910        
2911        while (!atEof) {
2912          int count = 0;
2913          int bytesProcessed = 0;
2914          rawKeys.reset();
2915          while (!atEof && 
2916                 bytesProcessed < memoryLimit && count < recordLimit) {
2917
2918            // Read a record into buffer
2919            // Note: Attempt to re-use 'rawValue' as far as possible
2920            int keyOffset = rawKeys.getLength();       
2921            ValueBytes rawValue = 
2922              (count == keyOffsets.length || rawValues[count] == null) ? 
2923              in.createValueBytes() : 
2924              rawValues[count];
2925            int recordLength = in.nextRaw(rawKeys, rawValue);
2926            if (recordLength == -1) {
2927              in.close();
2928              if (deleteInput) {
2929                fs.delete(inFiles[currentFile], true);
2930              }
2931              currentFile += 1;
2932              atEof = currentFile >= inFiles.length;
2933              if (!atEof) {
2934                in = new Reader(fs, inFiles[currentFile], conf);
2935              } else {
2936                in = null;
2937              }
2938              continue;
2939            }
2940
2941            int keyLength = rawKeys.getLength() - keyOffset;
2942
2943            if (count == keyOffsets.length)
2944              grow();
2945
2946            keyOffsets[count] = keyOffset;                // update pointers
2947            pointers[count] = count;
2948            keyLengths[count] = keyLength;
2949            rawValues[count] = rawValue;
2950
2951            bytesProcessed += recordLength; 
2952            count++;
2953          }
2954
2955          // buffer is full -- sort & flush it
2956          if(LOG.isDebugEnabled()) {
2957            LOG.debug("flushing segment " + segments);
2958          }
2959          rawBuffer = rawKeys.getData();
2960          sort(count);
2961          // indicate we're making progress
2962          if (progressable != null) {
2963            progressable.progress();
2964          }
2965          flush(count, bytesProcessed, compressionType, codec, 
2966                segments==0 && atEof);
2967          segments++;
2968        }
2969        return segments;
2970      }
2971
2972      public void close() throws IOException {
2973        if (in != null) {
2974          in.close();
2975        }
2976        if (out != null) {
2977          out.close();
2978        }
2979        if (indexOut != null) {
2980          indexOut.close();
2981        }
2982      }
2983
2984      private void grow() {
2985        int newLength = keyOffsets.length * 3 / 2;
2986        keyOffsets = grow(keyOffsets, newLength);
2987        pointers = grow(pointers, newLength);
2988        pointersCopy = new int[newLength];
2989        keyLengths = grow(keyLengths, newLength);
2990        rawValues = grow(rawValues, newLength);
2991      }
2992
2993      private int[] grow(int[] old, int newLength) {
2994        int[] result = new int[newLength];
2995        System.arraycopy(old, 0, result, 0, old.length);
2996        return result;
2997      }
2998      
2999      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
3000        ValueBytes[] result = new ValueBytes[newLength];
3001        System.arraycopy(old, 0, result, 0, old.length);
3002        for (int i=old.length; i < newLength; ++i) {
3003          result[i] = null;
3004        }
3005        return result;
3006      }
3007
3008      private void flush(int count, int bytesProcessed, 
3009                         CompressionType compressionType, 
3010                         CompressionCodec codec, 
3011                         boolean done) throws IOException {
3012        if (out == null) {
3013          outName = done ? outFile : outFile.suffix(".0");
3014          out = fs.create(outName);
3015          if (!done) {
3016            indexOut = fs.create(outName.suffix(".index"));
3017          }
3018        }
3019
3020        long segmentStart = out.getPos();
3021        Writer writer = createWriter(conf, Writer.stream(out), 
3022            Writer.keyClass(keyClass), Writer.valueClass(valClass),
3023            Writer.compression(compressionType, codec),
3024            Writer.metadata(done ? metadata : new Metadata()));
3025        
3026        if (!done) {
3027          writer.sync = null;                     // disable sync on temp files
3028        }
3029
3030        for (int i = 0; i < count; i++) {         // write in sorted order
3031          int p = pointers[i];
3032          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
3033        }
3034        writer.close();
3035        
3036        if (!done) {
3037          // Save the segment length
3038          WritableUtils.writeVLong(indexOut, segmentStart);
3039          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
3040          indexOut.flush();
3041        }
3042      }
3043
3044      private void sort(int count) {
3045        System.arraycopy(pointers, 0, pointersCopy, 0, count);
3046        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
3047      }
3048      class SeqFileComparator implements Comparator<IntWritable> {
3049        @Override
3050        public int compare(IntWritable I, IntWritable J) {
3051          return comparator.compare(rawBuffer, keyOffsets[I.get()], 
3052                                    keyLengths[I.get()], rawBuffer, 
3053                                    keyOffsets[J.get()], keyLengths[J.get()]);
3054        }
3055      }
3056      
3057      /** set the progressable object in order to report progress */
3058      public void setProgressable(Progressable progressable)
3059      {
3060        this.progressable = progressable;
3061      }
3062      
3063    } // SequenceFile.Sorter.SortPass
3064
3065    /** The interface to iterate over raw keys/values of SequenceFiles. */
3066    public static interface RawKeyValueIterator {
3067      /** Gets the current raw key
3068       * @return DataOutputBuffer
3069       * @throws IOException
3070       */
3071      DataOutputBuffer getKey() throws IOException; 
3072      /** Gets the current raw value
3073       * @return ValueBytes 
3074       * @throws IOException
3075       */
3076      ValueBytes getValue() throws IOException; 
3077      /** Sets up the current key and value (for getKey and getValue)
3078       * @return true if there exists a key/value, false otherwise 
3079       * @throws IOException
3080       */
3081      boolean next() throws IOException;
3082      /** closes the iterator so that the underlying streams can be closed
3083       * @throws IOException
3084       */
3085      void close() throws IOException;
3086      /** Gets the Progress object; this has a float (0.0 - 1.0) 
3087       * indicating the bytes processed by the iterator so far
3088       */
3089      Progress getProgress();
3090    }    
3091    
3092    /**
3093     * Merges the list of segments of type <code>SegmentDescriptor</code>
3094     * @param segments the list of SegmentDescriptors
3095     * @param tmpDir the directory to write temporary files into
3096     * @return RawKeyValueIterator
3097     * @throws IOException
3098     */
3099    public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3100                                     Path tmpDir) 
3101      throws IOException {
3102      // pass in object to report progress, if present
3103      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3104      return mQueue.merge();
3105    }
3106
3107    /**
3108     * Merges the contents of files passed in Path[] using a max factor value
3109     * that is already set
3110     * @param inNames the array of path names
3111     * @param deleteInputs true if the input files should be deleted when 
3112     * unnecessary
3113     * @param tmpDir the directory to write temporary files into
3114     * @return RawKeyValueIteratorMergeQueue
3115     * @throws IOException
3116     */
3117    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3118                                     Path tmpDir) 
3119      throws IOException {
3120      return merge(inNames, deleteInputs, 
3121                   (inNames.length < factor) ? inNames.length : factor,
3122                   tmpDir);
3123    }
3124
3125    /**
3126     * Merges the contents of files passed in Path[]
3127     * @param inNames the array of path names
3128     * @param deleteInputs true if the input files should be deleted when 
3129     * unnecessary
3130     * @param factor the factor that will be used as the maximum merge fan-in
3131     * @param tmpDir the directory to write temporary files into
3132     * @return RawKeyValueIteratorMergeQueue
3133     * @throws IOException
3134     */
3135    public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3136                                     int factor, Path tmpDir) 
3137      throws IOException {
3138      //get the segments from inNames
3139      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3140      for (int i = 0; i < inNames.length; i++) {
3141        SegmentDescriptor s = new SegmentDescriptor(0,
3142            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3143        s.preserveInput(!deleteInputs);
3144        s.doSync();
3145        a.add(s);
3146      }
3147      this.factor = factor;
3148      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3149      return mQueue.merge();
3150    }
3151
3152    /**
3153     * Merges the contents of files passed in Path[]
3154     * @param inNames the array of path names
3155     * @param tempDir the directory for creating temp files during merge
3156     * @param deleteInputs true if the input files should be deleted when 
3157     * unnecessary
3158     * @return RawKeyValueIteratorMergeQueue
3159     * @throws IOException
3160     */
3161    public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3162                                     boolean deleteInputs) 
3163      throws IOException {
3164      //outFile will basically be used as prefix for temp files for the
3165      //intermediate merge outputs           
3166      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3167      //get the segments from inNames
3168      ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3169      for (int i = 0; i < inNames.length; i++) {
3170        SegmentDescriptor s = new SegmentDescriptor(0,
3171            fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3172        s.preserveInput(!deleteInputs);
3173        s.doSync();
3174        a.add(s);
3175      }
3176      factor = (inNames.length < factor) ? inNames.length : factor;
3177      // pass in object to report progress, if present
3178      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3179      return mQueue.merge();
3180    }
3181
3182    /**
3183     * Clones the attributes (like compression of the input file and creates a 
3184     * corresponding Writer
3185     * @param inputFile the path of the input file whose attributes should be 
3186     * cloned
3187     * @param outputFile the path of the output file 
3188     * @param prog the Progressable to report status during the file write
3189     * @return Writer
3190     * @throws IOException
3191     */
3192    public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3193                                      Progressable prog) throws IOException {
3194      Reader reader = new Reader(conf,
3195                                 Reader.file(inputFile),
3196                                 new Reader.OnlyHeaderOption());
3197      CompressionType compress = reader.getCompressionType();
3198      CompressionCodec codec = reader.getCompressionCodec();
3199      reader.close();
3200
3201      Writer writer = createWriter(conf, 
3202                                   Writer.file(outputFile), 
3203                                   Writer.keyClass(keyClass), 
3204                                   Writer.valueClass(valClass), 
3205                                   Writer.compression(compress, codec), 
3206                                   Writer.progressable(prog));
3207      return writer;
3208    }
3209
3210    /**
3211     * Writes records from RawKeyValueIterator into a file represented by the 
3212     * passed writer
3213     * @param records the RawKeyValueIterator
3214     * @param writer the Writer created earlier 
3215     * @throws IOException
3216     */
3217    public void writeFile(RawKeyValueIterator records, Writer writer) 
3218      throws IOException {
3219      while(records.next()) {
3220        writer.appendRaw(records.getKey().getData(), 0, 
3221                         records.getKey().getLength(), records.getValue());
3222      }
3223      writer.sync();
3224    }
3225        
3226    /** Merge the provided files.
3227     * @param inFiles the array of input path names
3228     * @param outFile the final output file
3229     * @throws IOException
3230     */
3231    public void merge(Path[] inFiles, Path outFile) throws IOException {
3232      if (fs.exists(outFile)) {
3233        throw new IOException("already exists: " + outFile);
3234      }
3235      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3236      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3237      
3238      writeFile(r, writer);
3239
3240      writer.close();
3241    }
3242
3243    /** sort calls this to generate the final merged output */
3244    private int mergePass(Path tmpDir) throws IOException {
3245      if(LOG.isDebugEnabled()) {
3246        LOG.debug("running merge pass");
3247      }
3248      Writer writer = cloneFileAttributes(
3249                                          outFile.suffix(".0"), outFile, null);
3250      RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3251                                    outFile.suffix(".0.index"), tmpDir);
3252      writeFile(r, writer);
3253
3254      writer.close();
3255      return 0;
3256    }
3257
3258    /** Used by mergePass to merge the output of the sort
3259     * @param inName the name of the input file containing sorted segments
3260     * @param indexIn the offsets of the sorted segments
3261     * @param tmpDir the relative directory to store intermediate results in
3262     * @return RawKeyValueIterator
3263     * @throws IOException
3264     */
3265    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3266      throws IOException {
3267      //get the segments from indexIn
3268      //we create a SegmentContainer so that we can track segments belonging to
3269      //inName and delete inName as soon as we see that we have looked at all
3270      //the contained segments during the merge process & hence don't need 
3271      //them anymore
3272      SegmentContainer container = new SegmentContainer(inName, indexIn);
3273      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3274      return mQueue.merge();
3275    }
3276    
3277    /** This class implements the core of the merge logic */
3278    private class MergeQueue extends PriorityQueue 
3279      implements RawKeyValueIterator {
3280      private boolean compress;
3281      private boolean blockCompress;
3282      private DataOutputBuffer rawKey = new DataOutputBuffer();
3283      private ValueBytes rawValue;
3284      private long totalBytesProcessed;
3285      private float progPerByte;
3286      private Progress mergeProgress = new Progress();
3287      private Path tmpDir;
3288      private Progressable progress = null; //handle to the progress reporting object
3289      private SegmentDescriptor minSegment;
3290      
3291      //a TreeMap used to store the segments sorted by size (segment offset and
3292      //segment path name is used to break ties between segments of same sizes)
3293      private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3294        new TreeMap<SegmentDescriptor, Void>();
3295            
3296      @SuppressWarnings("unchecked")
3297      public void put(SegmentDescriptor stream) throws IOException {
3298        if (size() == 0) {
3299          compress = stream.in.isCompressed();
3300          blockCompress = stream.in.isBlockCompressed();
3301        } else if (compress != stream.in.isCompressed() || 
3302                   blockCompress != stream.in.isBlockCompressed()) {
3303          throw new IOException("All merged files must be compressed or not.");
3304        } 
3305        super.put(stream);
3306      }
3307      
3308      /**
3309       * A queue of file segments to merge
3310       * @param segments the file segments to merge
3311       * @param tmpDir a relative local directory to save intermediate files in
3312       * @param progress the reference to the Progressable object
3313       */
3314      public MergeQueue(List <SegmentDescriptor> segments,
3315          Path tmpDir, Progressable progress) {
3316        int size = segments.size();
3317        for (int i = 0; i < size; i++) {
3318          sortedSegmentSizes.put(segments.get(i), null);
3319        }
3320        this.tmpDir = tmpDir;
3321        this.progress = progress;
3322      }
3323      @Override
3324      protected boolean lessThan(Object a, Object b) {
3325        // indicate we're making progress
3326        if (progress != null) {
3327          progress.progress();
3328        }
3329        SegmentDescriptor msa = (SegmentDescriptor)a;
3330        SegmentDescriptor msb = (SegmentDescriptor)b;
3331        return comparator.compare(msa.getKey().getData(), 0, 
3332                                  msa.getKey().getLength(), msb.getKey().getData(), 0, 
3333                                  msb.getKey().getLength()) < 0;
3334      }
3335      @Override
3336      public void close() throws IOException {
3337        SegmentDescriptor ms;                           // close inputs
3338        while ((ms = (SegmentDescriptor)pop()) != null) {
3339          ms.cleanup();
3340        }
3341        minSegment = null;
3342      }
3343      @Override
3344      public DataOutputBuffer getKey() throws IOException {
3345        return rawKey;
3346      }
3347      @Override
3348      public ValueBytes getValue() throws IOException {
3349        return rawValue;
3350      }
3351      @Override
3352      public boolean next() throws IOException {
3353        if (size() == 0)
3354          return false;
3355        if (minSegment != null) {
3356          //minSegment is non-null for all invocations of next except the first
3357          //one. For the first invocation, the priority queue is ready for use
3358          //but for the subsequent invocations, first adjust the queue 
3359          adjustPriorityQueue(minSegment);
3360          if (size() == 0) {
3361            minSegment = null;
3362            return false;
3363          }
3364        }
3365        minSegment = (SegmentDescriptor)top();
3366        long startPos = minSegment.in.getPosition(); // Current position in stream
3367        //save the raw key reference
3368        rawKey = minSegment.getKey();
3369        //load the raw value. Re-use the existing rawValue buffer
3370        if (rawValue == null) {
3371          rawValue = minSegment.in.createValueBytes();
3372        }
3373        minSegment.nextRawValue(rawValue);
3374        long endPos = minSegment.in.getPosition(); // End position after reading value
3375        updateProgress(endPos - startPos);
3376        return true;
3377      }
3378      
3379      @Override
3380      public Progress getProgress() {
3381        return mergeProgress; 
3382      }
3383
3384      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3385        long startPos = ms.in.getPosition(); // Current position in stream
3386        boolean hasNext = ms.nextRawKey();
3387        long endPos = ms.in.getPosition(); // End position after reading key
3388        updateProgress(endPos - startPos);
3389        if (hasNext) {
3390          adjustTop();
3391        } else {
3392          pop();
3393          ms.cleanup();
3394        }
3395      }
3396
3397      private void updateProgress(long bytesProcessed) {
3398        totalBytesProcessed += bytesProcessed;
3399        if (progPerByte > 0) {
3400          mergeProgress.set(totalBytesProcessed * progPerByte);
3401        }
3402      }
3403      
3404      /** This is the single level merge that is called multiple times 
3405       * depending on the factor size and the number of segments
3406       * @return RawKeyValueIterator
3407       * @throws IOException
3408       */
3409      public RawKeyValueIterator merge() throws IOException {
3410        //create the MergeStreams from the sorted map created in the constructor
3411        //and dump the final output to a file
3412        int numSegments = sortedSegmentSizes.size();
3413        int origFactor = factor;
3414        int passNo = 1;
3415        LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3416        do {
3417          //get the factor for this pass of merge
3418          factor = getPassFactor(passNo, numSegments);
3419          List<SegmentDescriptor> segmentsToMerge =
3420            new ArrayList<SegmentDescriptor>();
3421          int segmentsConsidered = 0;
3422          int numSegmentsToConsider = factor;
3423          while (true) {
3424            //extract the smallest 'factor' number of segment pointers from the 
3425            //TreeMap. Call cleanup on the empty segments (no key/value data)
3426            SegmentDescriptor[] mStream = 
3427              getSegmentDescriptors(numSegmentsToConsider);
3428            for (int i = 0; i < mStream.length; i++) {
3429              if (mStream[i].nextRawKey()) {
3430                segmentsToMerge.add(mStream[i]);
3431                segmentsConsidered++;
3432                // Count the fact that we read some bytes in calling nextRawKey()
3433                updateProgress(mStream[i].in.getPosition());
3434              }
3435              else {
3436                mStream[i].cleanup();
3437                numSegments--; //we ignore this segment for the merge
3438              }
3439            }
3440            //if we have the desired number of segments
3441            //or looked at all available segments, we break
3442            if (segmentsConsidered == factor || 
3443                sortedSegmentSizes.size() == 0) {
3444              break;
3445            }
3446              
3447            numSegmentsToConsider = factor - segmentsConsidered;
3448          }
3449          //feed the streams to the priority queue
3450          initialize(segmentsToMerge.size()); clear();
3451          for (int i = 0; i < segmentsToMerge.size(); i++) {
3452            put(segmentsToMerge.get(i));
3453          }
3454          //if we have lesser number of segments remaining, then just return the
3455          //iterator, else do another single level merge
3456          if (numSegments <= factor) {
3457            //calculate the length of the remaining segments. Required for 
3458            //calculating the merge progress
3459            long totalBytes = 0;
3460            for (int i = 0; i < segmentsToMerge.size(); i++) {
3461              totalBytes += segmentsToMerge.get(i).segmentLength;
3462            }
3463            if (totalBytes != 0) //being paranoid
3464              progPerByte = 1.0f / (float)totalBytes;
3465            //reset factor to what it originally was
3466            factor = origFactor;
3467            return this;
3468          } else {
3469            //we want to spread the creation of temp files on multiple disks if 
3470            //available under the space constraints
3471            long approxOutputSize = 0; 
3472            for (SegmentDescriptor s : segmentsToMerge) {
3473              approxOutputSize += s.segmentLength + 
3474                                  ChecksumFileSystem.getApproxChkSumLength(
3475                                  s.segmentLength);
3476            }
3477            Path tmpFilename = 
3478              new Path(tmpDir, "intermediate").suffix("." + passNo);
3479
3480            Path outputFile =  lDirAlloc.getLocalPathForWrite(
3481                                                tmpFilename.toString(),
3482                                                approxOutputSize, conf);
3483            if(LOG.isDebugEnabled()) { 
3484              LOG.debug("writing intermediate results to " + outputFile);
3485            }
3486            Writer writer = cloneFileAttributes(
3487                                                fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3488                                                fs.makeQualified(outputFile), null);
3489            writer.sync = null; //disable sync for temp files
3490            writeFile(this, writer);
3491            writer.close();
3492            
3493            //we finished one single level merge; now clean up the priority 
3494            //queue
3495            this.close();
3496            
3497            SegmentDescriptor tempSegment = 
3498              new SegmentDescriptor(0,
3499                  fs.getFileStatus(outputFile).getLen(), outputFile);
3500            //put the segment back in the TreeMap
3501            sortedSegmentSizes.put(tempSegment, null);
3502            numSegments = sortedSegmentSizes.size();
3503            passNo++;
3504          }
3505          //we are worried about only the first pass merge factor. So reset the 
3506          //factor to what it originally was
3507          factor = origFactor;
3508        } while(true);
3509      }
3510  
3511      //Hadoop-591
3512      public int getPassFactor(int passNo, int numSegments) {
3513        if (passNo > 1 || numSegments <= factor || factor == 1) 
3514          return factor;
3515        int mod = (numSegments - 1) % (factor - 1);
3516        if (mod == 0)
3517          return factor;
3518        return mod + 1;
3519      }
3520      
3521      /** Return (& remove) the requested number of segment descriptors from the
3522       * sorted map.
3523       */
3524      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3525        if (numDescriptors > sortedSegmentSizes.size())
3526          numDescriptors = sortedSegmentSizes.size();
3527        SegmentDescriptor[] SegmentDescriptors = 
3528          new SegmentDescriptor[numDescriptors];
3529        Iterator iter = sortedSegmentSizes.keySet().iterator();
3530        int i = 0;
3531        while (i < numDescriptors) {
3532          SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3533          iter.remove();
3534        }
3535        return SegmentDescriptors;
3536      }
3537    } // SequenceFile.Sorter.MergeQueue
3538
3539    /** This class defines a merge segment. This class can be subclassed to 
3540     * provide a customized cleanup method implementation. In this 
3541     * implementation, cleanup closes the file handle and deletes the file 
3542     */
3543    public class SegmentDescriptor implements Comparable {
3544      
3545      long segmentOffset; //the start of the segment in the file
3546      long segmentLength; //the length of the segment
3547      Path segmentPathName; //the path name of the file containing the segment
3548      boolean ignoreSync = true; //set to true for temp files
3549      private Reader in = null; 
3550      private DataOutputBuffer rawKey = null; //this will hold the current key
3551      private boolean preserveInput = false; //delete input segment files?
3552      
3553      /** Constructs a segment
3554       * @param segmentOffset the offset of the segment in the file
3555       * @param segmentLength the length of the segment
3556       * @param segmentPathName the path name of the file containing the segment
3557       */
3558      public SegmentDescriptor (long segmentOffset, long segmentLength, 
3559                                Path segmentPathName) {
3560        this.segmentOffset = segmentOffset;
3561        this.segmentLength = segmentLength;
3562        this.segmentPathName = segmentPathName;
3563      }
3564      
3565      /** Do the sync checks */
3566      public void doSync() {ignoreSync = false;}
3567      
3568      /** Whether to delete the files when no longer needed */
3569      public void preserveInput(boolean preserve) {
3570        preserveInput = preserve;
3571      }
3572
3573      public boolean shouldPreserveInput() {
3574        return preserveInput;
3575      }
3576      
3577      @Override
3578      public int compareTo(Object o) {
3579        SegmentDescriptor that = (SegmentDescriptor)o;
3580        if (this.segmentLength != that.segmentLength) {
3581          return (this.segmentLength < that.segmentLength ? -1 : 1);
3582        }
3583        if (this.segmentOffset != that.segmentOffset) {
3584          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3585        }
3586        return (this.segmentPathName.toString()).
3587          compareTo(that.segmentPathName.toString());
3588      }
3589
3590      @Override
3591      public boolean equals(Object o) {
3592        if (!(o instanceof SegmentDescriptor)) {
3593          return false;
3594        }
3595        SegmentDescriptor that = (SegmentDescriptor)o;
3596        if (this.segmentLength == that.segmentLength &&
3597            this.segmentOffset == that.segmentOffset &&
3598            this.segmentPathName.toString().equals(
3599              that.segmentPathName.toString())) {
3600          return true;
3601        }
3602        return false;
3603      }
3604
3605      @Override
3606      public int hashCode() {
3607        return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3608      }
3609
3610      /** Fills up the rawKey object with the key returned by the Reader
3611       * @return true if there is a key returned; false, otherwise
3612       * @throws IOException
3613       */
3614      public boolean nextRawKey() throws IOException {
3615        if (in == null) {
3616          int bufferSize = getBufferSize(conf); 
3617          Reader reader = new Reader(conf,
3618                                     Reader.file(segmentPathName), 
3619                                     Reader.bufferSize(bufferSize),
3620                                     Reader.start(segmentOffset), 
3621                                     Reader.length(segmentLength));
3622        
3623          //sometimes we ignore syncs especially for temp merge files
3624          if (ignoreSync) reader.ignoreSync();
3625
3626          if (reader.getKeyClass() != keyClass)
3627            throw new IOException("wrong key class: " + reader.getKeyClass() +
3628                                  " is not " + keyClass);
3629          if (reader.getValueClass() != valClass)
3630            throw new IOException("wrong value class: "+reader.getValueClass()+
3631                                  " is not " + valClass);
3632          this.in = reader;
3633          rawKey = new DataOutputBuffer();
3634        }
3635        rawKey.reset();
3636        int keyLength = 
3637          in.nextRawKey(rawKey);
3638        return (keyLength >= 0);
3639      }
3640
3641      /** Fills up the passed rawValue with the value corresponding to the key
3642       * read earlier
3643       * @param rawValue
3644       * @return the length of the value
3645       * @throws IOException
3646       */
3647      public int nextRawValue(ValueBytes rawValue) throws IOException {
3648        int valLength = in.nextRawValue(rawValue);
3649        return valLength;
3650      }
3651      
3652      /** Returns the stored rawKey */
3653      public DataOutputBuffer getKey() {
3654        return rawKey;
3655      }
3656      
3657      /** closes the underlying reader */
3658      private void close() throws IOException {
3659        this.in.close();
3660        this.in = null;
3661      }
3662
3663      /** The default cleanup. Subclasses can override this with a custom 
3664       * cleanup 
3665       */
3666      public void cleanup() throws IOException {
3667        close();
3668        if (!preserveInput) {
3669          fs.delete(segmentPathName, true);
3670        }
3671      }
3672    } // SequenceFile.Sorter.SegmentDescriptor
3673    
3674    /** This class provisions multiple segments contained within a single
3675     *  file
3676     */
3677    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3678
3679      SegmentContainer parentContainer = null;
3680
3681      /** Constructs a segment
3682       * @param segmentOffset the offset of the segment in the file
3683       * @param segmentLength the length of the segment
3684       * @param segmentPathName the path name of the file containing the segment
3685       * @param parent the parent SegmentContainer that holds the segment
3686       */
3687      public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3688                                       Path segmentPathName, SegmentContainer parent) {
3689        super(segmentOffset, segmentLength, segmentPathName);
3690        this.parentContainer = parent;
3691      }
3692      /** The default cleanup. Subclasses can override this with a custom 
3693       * cleanup 
3694       */
3695      @Override
3696      public void cleanup() throws IOException {
3697        super.close();
3698        if (super.shouldPreserveInput()) return;
3699        parentContainer.cleanup();
3700      }
3701      
3702      @Override
3703      public boolean equals(Object o) {
3704        if (!(o instanceof LinkedSegmentsDescriptor)) {
3705          return false;
3706        }
3707        return super.equals(o);
3708      }
3709    } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3710
3711    /** The class that defines a container for segments to be merged. Primarily
3712     * required to delete temp files as soon as all the contained segments
3713     * have been looked at */
3714    private class SegmentContainer {
3715      private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3716      private int numSegmentsContained; //# of segments contained
3717      private Path inName; //input file from where segments are created
3718      
3719      //the list of segments read from the file
3720      private ArrayList <SegmentDescriptor> segments = 
3721        new ArrayList <SegmentDescriptor>();
3722      /** This constructor is there primarily to serve the sort routine that 
3723       * generates a single output file with an associated index file */
3724      public SegmentContainer(Path inName, Path indexIn) throws IOException {
3725        //get the segments from indexIn
3726        FSDataInputStream fsIndexIn = fs.open(indexIn);
3727        long end = fs.getFileStatus(indexIn).getLen();
3728        while (fsIndexIn.getPos() < end) {
3729          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3730          long segmentLength = WritableUtils.readVLong(fsIndexIn);
3731          Path segmentName = inName;
3732          segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3733                                                    segmentLength, segmentName, this));
3734        }
3735        fsIndexIn.close();
3736        fs.delete(indexIn, true);
3737        numSegmentsContained = segments.size();
3738        this.inName = inName;
3739      }
3740
3741      public List <SegmentDescriptor> getSegmentList() {
3742        return segments;
3743      }
3744      public void cleanup() throws IOException {
3745        numSegmentsCleanedUp++;
3746        if (numSegmentsCleanedUp == numSegmentsContained) {
3747          fs.delete(inName, true);
3748        }
3749      }
3750    } //SequenceFile.Sorter.SegmentContainer
3751
3752  } // SequenceFile.Sorter
3753
3754} // SequenceFile