001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.*;
022    import java.util.*;
023    import java.rmi.server.UID;
024    import java.security.MessageDigest;
025    import org.apache.commons.logging.*;
026    import org.apache.hadoop.util.Options;
027    import org.apache.hadoop.fs.*;
028    import org.apache.hadoop.fs.Options.CreateOpts;
029    import org.apache.hadoop.io.compress.CodecPool;
030    import org.apache.hadoop.io.compress.CompressionCodec;
031    import org.apache.hadoop.io.compress.CompressionInputStream;
032    import org.apache.hadoop.io.compress.CompressionOutputStream;
033    import org.apache.hadoop.io.compress.Compressor;
034    import org.apache.hadoop.io.compress.Decompressor;
035    import org.apache.hadoop.io.compress.DefaultCodec;
036    import org.apache.hadoop.io.compress.GzipCodec;
037    import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038    import org.apache.hadoop.io.serializer.Deserializer;
039    import org.apache.hadoop.io.serializer.Serializer;
040    import org.apache.hadoop.io.serializer.SerializationFactory;
041    import org.apache.hadoop.classification.InterfaceAudience;
042    import org.apache.hadoop.classification.InterfaceStability;
043    import org.apache.hadoop.conf.*;
044    import org.apache.hadoop.util.Progressable;
045    import org.apache.hadoop.util.Progress;
046    import org.apache.hadoop.util.ReflectionUtils;
047    import org.apache.hadoop.util.NativeCodeLoader;
048    import org.apache.hadoop.util.MergeSort;
049    import org.apache.hadoop.util.PriorityQueue;
050    import org.apache.hadoop.util.Time;
051    
052    /** 
053     * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054     * pairs.
055     * 
056     * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
057     * {@link Sorter} classes for writing, reading and sorting respectively.</p>
058     * 
059     * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
060     * {@link CompressionType} used to compress key/value pairs:
061     * <ol>
062     *   <li>
063     *   <code>Writer</code> : Uncompressed records.
064     *   </li>
065     *   <li>
066     *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
067     *                                       values.
068     *   </li>
069     *   <li>
070     *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
071     *                                      values are collected in 'blocks' 
072     *                                      separately and compressed. The size of 
073     *                                      the 'block' is configurable.
074     * </ol>
075     * 
076     * <p>The actual compression algorithm used to compress key and/or values can be
077     * specified by using the appropriate {@link CompressionCodec}.</p>
078     * 
079     * <p>The recommended way is to use the static <tt>createWriter</tt> methods
080     * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
081     *
082     * <p>The {@link Reader} acts as the bridge and can read any of the above 
083     * <code>SequenceFile</code> formats.</p>
084     *
085     * <h4 id="Formats">SequenceFile Formats</h4>
086     * 
087     * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
088     * depending on the <code>CompressionType</code> specified. All of them share a
089     * <a href="#Header">common header</a> described below.
090     * 
091     * <h5 id="Header">SequenceFile Header</h5>
092     * <ul>
093     *   <li>
094     *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
095     *             version number (e.g. SEQ4 or SEQ6)
096     *   </li>
097     *   <li>
098     *   keyClassName -key class
099     *   </li>
100     *   <li>
101     *   valueClassName - value class
102     *   </li>
103     *   <li>
104     *   compression - A boolean which specifies if compression is turned on for 
105     *                 keys/values in this file.
106     *   </li>
107     *   <li>
108     *   blockCompression - A boolean which specifies if block-compression is 
109     *                      turned on for keys/values in this file.
110     *   </li>
111     *   <li>
112     *   compression codec - <code>CompressionCodec</code> class which is used for  
113     *                       compression of keys and/or values (if compression is 
114     *                       enabled).
115     *   </li>
116     *   <li>
117     *   metadata - {@link Metadata} for this file.
118     *   </li>
119     *   <li>
120     *   sync - A sync marker to denote end of the header.
121     *   </li>
122     * </ul>
123     * 
124     * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
125     * <ul>
126     * <li>
127     * <a href="#Header">Header</a>
128     * </li>
129     * <li>
130     * Record
131     *   <ul>
132     *     <li>Record length</li>
133     *     <li>Key length</li>
134     *     <li>Key</li>
135     *     <li>Value</li>
136     *   </ul>
137     * </li>
138     * <li>
139     * A sync-marker every few <code>100</code> bytes or so.
140     * </li>
141     * </ul>
142     *
143     * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
144     * <ul>
145     * <li>
146     * <a href="#Header">Header</a>
147     * </li>
148     * <li>
149     * Record
150     *   <ul>
151     *     <li>Record length</li>
152     *     <li>Key length</li>
153     *     <li>Key</li>
154     *     <li><i>Compressed</i> Value</li>
155     *   </ul>
156     * </li>
157     * <li>
158     * A sync-marker every few <code>100</code> bytes or so.
159     * </li>
160     * </ul>
161     * 
162     * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
163     * <ul>
164     * <li>
165     * <a href="#Header">Header</a>
166     * </li>
167     * <li>
168     * Record <i>Block</i>
169     *   <ul>
170     *     <li>Uncompressed number of records in the block</li>
171     *     <li>Compressed key-lengths block-size</li>
172     *     <li>Compressed key-lengths block</li>
173     *     <li>Compressed keys block-size</li>
174     *     <li>Compressed keys block</li>
175     *     <li>Compressed value-lengths block-size</li>
176     *     <li>Compressed value-lengths block</li>
177     *     <li>Compressed values block-size</li>
178     *     <li>Compressed values block</li>
179     *   </ul>
180     * </li>
181     * <li>
182     * A sync-marker every block.
183     * </li>
184     * </ul>
185     * 
186     * <p>The compressed blocks of key lengths and value lengths consist of the 
187     * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
188     * format.</p>
189     * 
190     * @see CompressionCodec
191     */
192    @InterfaceAudience.Public
193    @InterfaceStability.Stable
194    public class SequenceFile {
195      private static final Log LOG = LogFactory.getLog(SequenceFile.class);
196    
197      private SequenceFile() {}                         // no public ctor
198    
199      private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
200      private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
201      private static final byte VERSION_WITH_METADATA = (byte)6;
202      private static byte[] VERSION = new byte[] {
203        (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
204      };
205    
206      private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
207      private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
208      private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
209    
210      /** The number of bytes between sync points.*/
211      public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
212    
213      /** 
214       * The compression type used to compress key/value pairs in the 
215       * {@link SequenceFile}.
216       * 
217       * @see SequenceFile.Writer
218       */
219      public static enum CompressionType {
220        /** Do not compress records. */
221        NONE, 
222        /** Compress values only, each separately. */
223        RECORD,
224        /** Compress sequences of records together in blocks. */
225        BLOCK
226      }
227    
228      /**
229       * Get the compression type for the reduce outputs
230       * @param job the job config to look in
231       * @return the kind of compression to use
232       */
233      static public CompressionType getDefaultCompressionType(Configuration job) {
234        String name = job.get("io.seqfile.compression.type");
235        return name == null ? CompressionType.RECORD : 
236          CompressionType.valueOf(name);
237      }
238      
239      /**
240       * Set the default compression type for sequence files.
241       * @param job the configuration to modify
242       * @param val the new compression type (none, block, record)
243       */
244      static public void setDefaultCompressionType(Configuration job, 
245                                                   CompressionType val) {
246        job.set("io.seqfile.compression.type", val.toString());
247      }
248    
249      /**
250       * Create a new Writer with the given options.
251       * @param conf the configuration to use
252       * @param opts the options to create the file with
253       * @return a new Writer
254       * @throws IOException
255       */
256      public static Writer createWriter(Configuration conf, Writer.Option... opts
257                                        ) throws IOException {
258        Writer.CompressionOption compressionOption = 
259          Options.getOption(Writer.CompressionOption.class, opts);
260        CompressionType kind;
261        if (compressionOption != null) {
262          kind = compressionOption.getValue();
263        } else {
264          kind = getDefaultCompressionType(conf);
265          opts = Options.prependOptions(opts, Writer.compression(kind));
266        }
267        switch (kind) {
268          default:
269          case NONE:
270            return new Writer(conf, opts);
271          case RECORD:
272            return new RecordCompressWriter(conf, opts);
273          case BLOCK:
274            return new BlockCompressWriter(conf, opts);
275        }
276      }
277    
278      /**
279       * Construct the preferred type of SequenceFile Writer.
280       * @param fs The configured filesystem. 
281       * @param conf The configuration.
282       * @param name The name of the file. 
283       * @param keyClass The 'key' type.
284       * @param valClass The 'value' type.
285       * @return Returns the handle to the constructed SequenceFile Writer.
286       * @throws IOException
287       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
288       *     instead.
289       */
290      @Deprecated
291      public static Writer 
292        createWriter(FileSystem fs, Configuration conf, Path name, 
293                     Class keyClass, Class valClass) throws IOException {
294        return createWriter(conf, Writer.filesystem(fs),
295                            Writer.file(name), Writer.keyClass(keyClass),
296                            Writer.valueClass(valClass));
297      }
298      
299      /**
300       * Construct the preferred type of SequenceFile Writer.
301       * @param fs The configured filesystem. 
302       * @param conf The configuration.
303       * @param name The name of the file. 
304       * @param keyClass The 'key' type.
305       * @param valClass The 'value' type.
306       * @param compressionType The compression type.
307       * @return Returns the handle to the constructed SequenceFile Writer.
308       * @throws IOException
309       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
310       *     instead.
311       */
312      @Deprecated
313      public static Writer 
314        createWriter(FileSystem fs, Configuration conf, Path name, 
315                     Class keyClass, Class valClass, 
316                     CompressionType compressionType) throws IOException {
317        return createWriter(conf, Writer.filesystem(fs),
318                            Writer.file(name), Writer.keyClass(keyClass),
319                            Writer.valueClass(valClass), 
320                            Writer.compression(compressionType));
321      }
322      
323      /**
324       * Construct the preferred type of SequenceFile Writer.
325       * @param fs The configured filesystem. 
326       * @param conf The configuration.
327       * @param name The name of the file. 
328       * @param keyClass The 'key' type.
329       * @param valClass The 'value' type.
330       * @param compressionType The compression type.
331       * @param progress The Progressable object to track progress.
332       * @return Returns the handle to the constructed SequenceFile Writer.
333       * @throws IOException
334       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
335       *     instead.
336       */
337      @Deprecated
338      public static Writer
339        createWriter(FileSystem fs, Configuration conf, Path name, 
340                     Class keyClass, Class valClass, CompressionType compressionType,
341                     Progressable progress) throws IOException {
342        return createWriter(conf, Writer.file(name),
343                            Writer.filesystem(fs),
344                            Writer.keyClass(keyClass),
345                            Writer.valueClass(valClass), 
346                            Writer.compression(compressionType),
347                            Writer.progressable(progress));
348      }
349    
350      /**
351       * Construct the preferred type of SequenceFile Writer.
352       * @param fs The configured filesystem. 
353       * @param conf The configuration.
354       * @param name The name of the file. 
355       * @param keyClass The 'key' type.
356       * @param valClass The 'value' type.
357       * @param compressionType The compression type.
358       * @param codec The compression codec.
359       * @return Returns the handle to the constructed SequenceFile Writer.
360       * @throws IOException
361       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
362       *     instead.
363       */
364      @Deprecated
365      public static Writer 
366        createWriter(FileSystem fs, Configuration conf, Path name, 
367                     Class keyClass, Class valClass, CompressionType compressionType, 
368                     CompressionCodec codec) throws IOException {
369        return createWriter(conf, Writer.file(name),
370                            Writer.filesystem(fs),
371                            Writer.keyClass(keyClass),
372                            Writer.valueClass(valClass), 
373                            Writer.compression(compressionType, codec));
374      }
375      
376      /**
377       * Construct the preferred type of SequenceFile Writer.
378       * @param fs The configured filesystem. 
379       * @param conf The configuration.
380       * @param name The name of the file. 
381       * @param keyClass The 'key' type.
382       * @param valClass The 'value' type.
383       * @param compressionType The compression type.
384       * @param codec The compression codec.
385       * @param progress The Progressable object to track progress.
386       * @param metadata The metadata of the file.
387       * @return Returns the handle to the constructed SequenceFile Writer.
388       * @throws IOException
389       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
390       *     instead.
391       */
392      @Deprecated
393      public static Writer
394        createWriter(FileSystem fs, Configuration conf, Path name, 
395                     Class keyClass, Class valClass, 
396                     CompressionType compressionType, CompressionCodec codec,
397                     Progressable progress, Metadata metadata) throws IOException {
398        return createWriter(conf, Writer.file(name),
399                            Writer.filesystem(fs),
400                            Writer.keyClass(keyClass),
401                            Writer.valueClass(valClass),
402                            Writer.compression(compressionType, codec),
403                            Writer.progressable(progress),
404                            Writer.metadata(metadata));
405      }
406    
407      /**
408       * Construct the preferred type of SequenceFile Writer.
409       * @param fs The configured filesystem.
410       * @param conf The configuration.
411       * @param name The name of the file.
412       * @param keyClass The 'key' type.
413       * @param valClass The 'value' type.
414       * @param bufferSize buffer size for the underlaying outputstream.
415       * @param replication replication factor for the file.
416       * @param blockSize block size for the file.
417       * @param compressionType The compression type.
418       * @param codec The compression codec.
419       * @param progress The Progressable object to track progress.
420       * @param metadata The metadata of the file.
421       * @return Returns the handle to the constructed SequenceFile Writer.
422       * @throws IOException
423       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
424       *     instead.
425       */
426      @Deprecated
427      public static Writer
428        createWriter(FileSystem fs, Configuration conf, Path name,
429                     Class keyClass, Class valClass, int bufferSize,
430                     short replication, long blockSize,
431                     CompressionType compressionType, CompressionCodec codec,
432                     Progressable progress, Metadata metadata) throws IOException {
433        return createWriter(conf, Writer.file(name),
434                            Writer.filesystem(fs),
435                            Writer.keyClass(keyClass),
436                            Writer.valueClass(valClass), 
437                            Writer.bufferSize(bufferSize), 
438                            Writer.replication(replication),
439                            Writer.blockSize(blockSize),
440                            Writer.compression(compressionType, codec),
441                            Writer.progressable(progress),
442                            Writer.metadata(metadata));
443      }
444    
445      /**
446       * Construct the preferred type of SequenceFile Writer.
447       * @param fs The configured filesystem.
448       * @param conf The configuration.
449       * @param name The name of the file.
450       * @param keyClass The 'key' type.
451       * @param valClass The 'value' type.
452       * @param bufferSize buffer size for the underlaying outputstream.
453       * @param replication replication factor for the file.
454       * @param blockSize block size for the file.
455       * @param createParent create parent directory if non-existent
456       * @param compressionType The compression type.
457       * @param codec The compression codec.
458       * @param metadata The metadata of the file.
459       * @return Returns the handle to the constructed SequenceFile Writer.
460       * @throws IOException
461       */
462      @Deprecated
463      public static Writer
464      createWriter(FileSystem fs, Configuration conf, Path name,
465                   Class keyClass, Class valClass, int bufferSize,
466                   short replication, long blockSize, boolean createParent,
467                   CompressionType compressionType, CompressionCodec codec,
468                   Metadata metadata) throws IOException {
469        return createWriter(FileContext.getFileContext(fs.getUri(), conf),
470            conf, name, keyClass, valClass, compressionType, codec,
471            metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
472            CreateOpts.bufferSize(bufferSize),
473            createParent ? CreateOpts.createParent()
474                         : CreateOpts.donotCreateParent(),
475            CreateOpts.repFac(replication),
476            CreateOpts.blockSize(blockSize)
477          );
478      }
479    
480      /**
481       * Construct the preferred type of SequenceFile Writer.
482       * @param fc The context for the specified file.
483       * @param conf The configuration.
484       * @param name The name of the file.
485       * @param keyClass The 'key' type.
486       * @param valClass The 'value' type.
487       * @param compressionType The compression type.
488       * @param codec The compression codec.
489       * @param metadata The metadata of the file.
490       * @param createFlag gives the semantics of create: overwrite, append etc.
491       * @param opts file creation options; see {@link CreateOpts}.
492       * @return Returns the handle to the constructed SequenceFile Writer.
493       * @throws IOException
494       */
495      public static Writer
496      createWriter(FileContext fc, Configuration conf, Path name,
497                   Class keyClass, Class valClass,
498                   CompressionType compressionType, CompressionCodec codec,
499                   Metadata metadata,
500                   final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
501                   throws IOException {
502        return createWriter(conf, fc.create(name, createFlag, opts),
503              keyClass, valClass, compressionType, codec, metadata).ownStream();
504      }
505    
506      /**
507       * Construct the preferred type of SequenceFile Writer.
508       * @param fs The configured filesystem. 
509       * @param conf The configuration.
510       * @param name The name of the file. 
511       * @param keyClass The 'key' type.
512       * @param valClass The 'value' type.
513       * @param compressionType The compression type.
514       * @param codec The compression codec.
515       * @param progress The Progressable object to track progress.
516       * @return Returns the handle to the constructed SequenceFile Writer.
517       * @throws IOException
518       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
519       *     instead.
520       */
521      @Deprecated
522      public static Writer
523        createWriter(FileSystem fs, Configuration conf, Path name, 
524                     Class keyClass, Class valClass, 
525                     CompressionType compressionType, CompressionCodec codec,
526                     Progressable progress) throws IOException {
527        return createWriter(conf, Writer.file(name),
528                            Writer.filesystem(fs),
529                            Writer.keyClass(keyClass),
530                            Writer.valueClass(valClass),
531                            Writer.compression(compressionType, codec),
532                            Writer.progressable(progress));
533      }
534    
535      /**
536       * Construct the preferred type of 'raw' SequenceFile Writer.
537       * @param conf The configuration.
538       * @param out The stream on top which the writer is to be constructed.
539       * @param keyClass The 'key' type.
540       * @param valClass The 'value' type.
541       * @param compressionType The compression type.
542       * @param codec The compression codec.
543       * @param metadata The metadata of the file.
544       * @return Returns the handle to the constructed SequenceFile Writer.
545       * @throws IOException
546       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
547       *     instead.
548       */
549      @Deprecated
550      public static Writer
551        createWriter(Configuration conf, FSDataOutputStream out, 
552                     Class keyClass, Class valClass,
553                     CompressionType compressionType,
554                     CompressionCodec codec, Metadata metadata) throws IOException {
555        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
556                            Writer.valueClass(valClass), 
557                            Writer.compression(compressionType, codec),
558                            Writer.metadata(metadata));
559      }
560      
561      /**
562       * Construct the preferred type of 'raw' SequenceFile Writer.
563       * @param conf The configuration.
564       * @param out The stream on top which the writer is to be constructed.
565       * @param keyClass The 'key' type.
566       * @param valClass The 'value' type.
567       * @param compressionType The compression type.
568       * @param codec The compression codec.
569       * @return Returns the handle to the constructed SequenceFile Writer.
570       * @throws IOException
571       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
572       *     instead.
573       */
574      @Deprecated
575      public static Writer
576        createWriter(Configuration conf, FSDataOutputStream out, 
577                     Class keyClass, Class valClass, CompressionType compressionType,
578                     CompressionCodec codec) throws IOException {
579        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
580                            Writer.valueClass(valClass),
581                            Writer.compression(compressionType, codec));
582      }
583      
584    
585      /** The interface to 'raw' values of SequenceFiles. */
586      public static interface ValueBytes {
587    
588        /** Writes the uncompressed bytes to the outStream.
589         * @param outStream : Stream to write uncompressed bytes into.
590         * @throws IOException
591         */
592        public void writeUncompressedBytes(DataOutputStream outStream)
593          throws IOException;
594    
595        /** Write compressed bytes to outStream. 
596         * Note: that it will NOT compress the bytes if they are not compressed.
597         * @param outStream : Stream to write compressed bytes into.
598         */
599        public void writeCompressedBytes(DataOutputStream outStream) 
600          throws IllegalArgumentException, IOException;
601    
602        /**
603         * Size of stored data.
604         */
605        public int getSize();
606      }
607      
608      private static class UncompressedBytes implements ValueBytes {
609        private int dataSize;
610        private byte[] data;
611        
612        private UncompressedBytes() {
613          data = null;
614          dataSize = 0;
615        }
616        
617        private void reset(DataInputStream in, int length) throws IOException {
618          if (data == null) {
619            data = new byte[length];
620          } else if (length > data.length) {
621            data = new byte[Math.max(length, data.length * 2)];
622          }
623          dataSize = -1;
624          in.readFully(data, 0, length);
625          dataSize = length;
626        }
627        
628        @Override
629        public int getSize() {
630          return dataSize;
631        }
632        
633        @Override
634        public void writeUncompressedBytes(DataOutputStream outStream)
635          throws IOException {
636          outStream.write(data, 0, dataSize);
637        }
638    
639        @Override
640        public void writeCompressedBytes(DataOutputStream outStream) 
641          throws IllegalArgumentException, IOException {
642          throw 
643            new IllegalArgumentException("UncompressedBytes cannot be compressed!");
644        }
645    
646      } // UncompressedBytes
647      
648      private static class CompressedBytes implements ValueBytes {
649        private int dataSize;
650        private byte[] data;
651        DataInputBuffer rawData = null;
652        CompressionCodec codec = null;
653        CompressionInputStream decompressedStream = null;
654    
655        private CompressedBytes(CompressionCodec codec) {
656          data = null;
657          dataSize = 0;
658          this.codec = codec;
659        }
660    
661        private void reset(DataInputStream in, int length) throws IOException {
662          if (data == null) {
663            data = new byte[length];
664          } else if (length > data.length) {
665            data = new byte[Math.max(length, data.length * 2)];
666          } 
667          dataSize = -1;
668          in.readFully(data, 0, length);
669          dataSize = length;
670        }
671        
672        @Override
673        public int getSize() {
674          return dataSize;
675        }
676        
677        @Override
678        public void writeUncompressedBytes(DataOutputStream outStream)
679          throws IOException {
680          if (decompressedStream == null) {
681            rawData = new DataInputBuffer();
682            decompressedStream = codec.createInputStream(rawData);
683          } else {
684            decompressedStream.resetState();
685          }
686          rawData.reset(data, 0, dataSize);
687    
688          byte[] buffer = new byte[8192];
689          int bytesRead = 0;
690          while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
691            outStream.write(buffer, 0, bytesRead);
692          }
693        }
694    
695        @Override
696        public void writeCompressedBytes(DataOutputStream outStream) 
697          throws IllegalArgumentException, IOException {
698          outStream.write(data, 0, dataSize);
699        }
700    
701      } // CompressedBytes
702      
703      /**
704       * The class encapsulating with the metadata of a file.
705       * The metadata of a file is a list of attribute name/value
706       * pairs of Text type.
707       *
708       */
709      public static class Metadata implements Writable {
710    
711        private TreeMap<Text, Text> theMetadata;
712        
713        public Metadata() {
714          this(new TreeMap<Text, Text>());
715        }
716        
717        public Metadata(TreeMap<Text, Text> arg) {
718          if (arg == null) {
719            this.theMetadata = new TreeMap<Text, Text>();
720          } else {
721            this.theMetadata = arg;
722          }
723        }
724        
725        public Text get(Text name) {
726          return this.theMetadata.get(name);
727        }
728        
729        public void set(Text name, Text value) {
730          this.theMetadata.put(name, value);
731        }
732        
733        public TreeMap<Text, Text> getMetadata() {
734          return new TreeMap<Text, Text>(this.theMetadata);
735        }
736        
737        @Override
738        public void write(DataOutput out) throws IOException {
739          out.writeInt(this.theMetadata.size());
740          Iterator<Map.Entry<Text, Text>> iter =
741            this.theMetadata.entrySet().iterator();
742          while (iter.hasNext()) {
743            Map.Entry<Text, Text> en = iter.next();
744            en.getKey().write(out);
745            en.getValue().write(out);
746          }
747        }
748    
749        @Override
750        public void readFields(DataInput in) throws IOException {
751          int sz = in.readInt();
752          if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
753          this.theMetadata = new TreeMap<Text, Text>();
754          for (int i = 0; i < sz; i++) {
755            Text key = new Text();
756            Text val = new Text();
757            key.readFields(in);
758            val.readFields(in);
759            this.theMetadata.put(key, val);
760          }    
761        }
762    
763        @Override
764        public boolean equals(Object other) {
765          if (other == null) {
766            return false;
767          }
768          if (other.getClass() != this.getClass()) {
769            return false;
770          } else {
771            return equals((Metadata)other);
772          }
773        }
774        
775        public boolean equals(Metadata other) {
776          if (other == null) return false;
777          if (this.theMetadata.size() != other.theMetadata.size()) {
778            return false;
779          }
780          Iterator<Map.Entry<Text, Text>> iter1 =
781            this.theMetadata.entrySet().iterator();
782          Iterator<Map.Entry<Text, Text>> iter2 =
783            other.theMetadata.entrySet().iterator();
784          while (iter1.hasNext() && iter2.hasNext()) {
785            Map.Entry<Text, Text> en1 = iter1.next();
786            Map.Entry<Text, Text> en2 = iter2.next();
787            if (!en1.getKey().equals(en2.getKey())) {
788              return false;
789            }
790            if (!en1.getValue().equals(en2.getValue())) {
791              return false;
792            }
793          }
794          if (iter1.hasNext() || iter2.hasNext()) {
795            return false;
796          }
797          return true;
798        }
799    
800        @Override
801        public int hashCode() {
802          assert false : "hashCode not designed";
803          return 42; // any arbitrary constant will do 
804        }
805        
806        @Override
807        public String toString() {
808          StringBuilder sb = new StringBuilder();
809          sb.append("size: ").append(this.theMetadata.size()).append("\n");
810          Iterator<Map.Entry<Text, Text>> iter =
811            this.theMetadata.entrySet().iterator();
812          while (iter.hasNext()) {
813            Map.Entry<Text, Text> en = iter.next();
814            sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
815            sb.append("\n");
816          }
817          return sb.toString();
818        }
819      }
820      
821      /** Write key/value pairs to a sequence-format file. */
822      public static class Writer implements java.io.Closeable, Syncable {
823        private Configuration conf;
824        FSDataOutputStream out;
825        boolean ownOutputStream = true;
826        DataOutputBuffer buffer = new DataOutputBuffer();
827    
828        Class keyClass;
829        Class valClass;
830    
831        private final CompressionType compress;
832        CompressionCodec codec = null;
833        CompressionOutputStream deflateFilter = null;
834        DataOutputStream deflateOut = null;
835        Metadata metadata = null;
836        Compressor compressor = null;
837        
838        protected Serializer keySerializer;
839        protected Serializer uncompressedValSerializer;
840        protected Serializer compressedValSerializer;
841        
842        // Insert a globally unique 16-byte value every few entries, so that one
843        // can seek into the middle of a file and then synchronize with record
844        // starts and ends by scanning for this value.
845        long lastSyncPos;                     // position of last sync
846        byte[] sync;                          // 16 random bytes
847        {
848          try {                                       
849            MessageDigest digester = MessageDigest.getInstance("MD5");
850            long time = Time.now();
851            digester.update((new UID()+"@"+time).getBytes());
852            sync = digester.digest();
853          } catch (Exception e) {
854            throw new RuntimeException(e);
855          }
856        }
857    
858        public static interface Option {}
859        
860        static class FileOption extends Options.PathOption 
861                                        implements Option {
862          FileOption(Path path) {
863            super(path);
864          }
865        }
866    
867        /**
868         * @deprecated only used for backwards-compatibility in the createWriter methods
869         * that take FileSystem.
870         */
871        @Deprecated
872        private static class FileSystemOption implements Option {
873          private final FileSystem value;
874          protected FileSystemOption(FileSystem value) {
875            this.value = value;
876          }
877          public FileSystem getValue() {
878            return value;
879          }
880        }
881    
882        static class StreamOption extends Options.FSDataOutputStreamOption 
883                                  implements Option {
884          StreamOption(FSDataOutputStream stream) {
885            super(stream);
886          }
887        }
888    
889        static class BufferSizeOption extends Options.IntegerOption
890                                      implements Option {
891          BufferSizeOption(int value) {
892            super(value);
893          }
894        }
895        
896        static class BlockSizeOption extends Options.LongOption implements Option {
897          BlockSizeOption(long value) {
898            super(value);
899          }
900        }
901    
902        static class ReplicationOption extends Options.IntegerOption
903                                       implements Option {
904          ReplicationOption(int value) {
905            super(value);
906          }
907        }
908    
909        static class KeyClassOption extends Options.ClassOption implements Option {
910          KeyClassOption(Class<?> value) {
911            super(value);
912          }
913        }
914    
915        static class ValueClassOption extends Options.ClassOption
916                                              implements Option {
917          ValueClassOption(Class<?> value) {
918            super(value);
919          }
920        }
921    
922        static class MetadataOption implements Option {
923          private final Metadata value;
924          MetadataOption(Metadata value) {
925            this.value = value;
926          }
927          Metadata getValue() {
928            return value;
929          }
930        }
931    
932        static class ProgressableOption extends Options.ProgressableOption
933                                        implements Option {
934          ProgressableOption(Progressable value) {
935            super(value);
936          }
937        }
938    
939        private static class CompressionOption implements Option {
940          private final CompressionType value;
941          private final CompressionCodec codec;
942          CompressionOption(CompressionType value) {
943            this(value, null);
944          }
945          CompressionOption(CompressionType value, CompressionCodec codec) {
946            this.value = value;
947            this.codec = (CompressionType.NONE != value && null == codec)
948              ? new DefaultCodec()
949              : codec;
950          }
951          CompressionType getValue() {
952            return value;
953          }
954          CompressionCodec getCodec() {
955            return codec;
956          }
957        }
958        
959        public static Option file(Path value) {
960          return new FileOption(value);
961        }
962    
963        /**
964         * @deprecated only used for backwards-compatibility in the createWriter methods
965         * that take FileSystem.
966         */
967        @Deprecated
968        private static Option filesystem(FileSystem fs) {
969          return new SequenceFile.Writer.FileSystemOption(fs);
970        }
971        
972        public static Option bufferSize(int value) {
973          return new BufferSizeOption(value);
974        }
975        
976        public static Option stream(FSDataOutputStream value) {
977          return new StreamOption(value);
978        }
979        
980        public static Option replication(short value) {
981          return new ReplicationOption(value);
982        }
983        
984        public static Option blockSize(long value) {
985          return new BlockSizeOption(value);
986        }
987        
988        public static Option progressable(Progressable value) {
989          return new ProgressableOption(value);
990        }
991    
992        public static Option keyClass(Class<?> value) {
993          return new KeyClassOption(value);
994        }
995        
996        public static Option valueClass(Class<?> value) {
997          return new ValueClassOption(value);
998        }
999        
1000        public static Option metadata(Metadata value) {
1001          return new MetadataOption(value);
1002        }
1003    
1004        public static Option compression(CompressionType value) {
1005          return new CompressionOption(value);
1006        }
1007    
1008        public static Option compression(CompressionType value,
1009            CompressionCodec codec) {
1010          return new CompressionOption(value, codec);
1011        }
1012        
1013        /**
1014         * Construct a uncompressed writer from a set of options.
1015         * @param conf the configuration to use
1016         * @param options the options used when creating the writer
1017         * @throws IOException if it fails
1018         */
1019        Writer(Configuration conf, 
1020               Option... opts) throws IOException {
1021          BlockSizeOption blockSizeOption = 
1022            Options.getOption(BlockSizeOption.class, opts);
1023          BufferSizeOption bufferSizeOption = 
1024            Options.getOption(BufferSizeOption.class, opts);
1025          ReplicationOption replicationOption = 
1026            Options.getOption(ReplicationOption.class, opts);
1027          ProgressableOption progressOption = 
1028            Options.getOption(ProgressableOption.class, opts);
1029          FileOption fileOption = Options.getOption(FileOption.class, opts);
1030          FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1031          StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1032          KeyClassOption keyClassOption = 
1033            Options.getOption(KeyClassOption.class, opts);
1034          ValueClassOption valueClassOption = 
1035            Options.getOption(ValueClassOption.class, opts);
1036          MetadataOption metadataOption = 
1037            Options.getOption(MetadataOption.class, opts);
1038          CompressionOption compressionTypeOption =
1039            Options.getOption(CompressionOption.class, opts);
1040          // check consistency of options
1041          if ((fileOption == null) == (streamOption == null)) {
1042            throw new IllegalArgumentException("file or stream must be specified");
1043          }
1044          if (fileOption == null && (blockSizeOption != null ||
1045                                     bufferSizeOption != null ||
1046                                     replicationOption != null ||
1047                                     progressOption != null)) {
1048            throw new IllegalArgumentException("file modifier options not " +
1049                                               "compatible with stream");
1050          }
1051    
1052          FSDataOutputStream out;
1053          boolean ownStream = fileOption != null;
1054          if (ownStream) {
1055            Path p = fileOption.getValue();
1056            FileSystem fs;
1057            if (fsOption != null) {
1058              fs = fsOption.getValue();
1059            } else {
1060              fs = p.getFileSystem(conf);
1061            }
1062            int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1063              bufferSizeOption.getValue();
1064            short replication = replicationOption == null ? 
1065              fs.getDefaultReplication(p) :
1066              (short) replicationOption.getValue();
1067            long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1068              blockSizeOption.getValue();
1069            Progressable progress = progressOption == null ? null :
1070              progressOption.getValue();
1071            out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1072          } else {
1073            out = streamOption.getValue();
1074          }
1075          Class<?> keyClass = keyClassOption == null ?
1076              Object.class : keyClassOption.getValue();
1077          Class<?> valueClass = valueClassOption == null ?
1078              Object.class : valueClassOption.getValue();
1079          Metadata metadata = metadataOption == null ?
1080              new Metadata() : metadataOption.getValue();
1081          this.compress = compressionTypeOption.getValue();
1082          final CompressionCodec codec = compressionTypeOption.getCodec();
1083          if (codec != null &&
1084              (codec instanceof GzipCodec) &&
1085              !NativeCodeLoader.isNativeCodeLoaded() &&
1086              !ZlibFactory.isNativeZlibLoaded(conf)) {
1087            throw new IllegalArgumentException("SequenceFile doesn't work with " +
1088                                               "GzipCodec without native-hadoop " +
1089                                               "code!");
1090          }
1091          init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1092        }
1093    
1094        /** Create the named file.
1095         * @deprecated Use 
1096         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1097         *   instead.
1098         */
1099        @Deprecated
1100        public Writer(FileSystem fs, Configuration conf, Path name, 
1101                      Class keyClass, Class valClass) throws IOException {
1102          this.compress = CompressionType.NONE;
1103          init(conf, fs.create(name), true, keyClass, valClass, null, 
1104               new Metadata());
1105        }
1106        
1107        /** Create the named file with write-progress reporter.
1108         * @deprecated Use 
1109         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1110         *   instead.
1111         */
1112        @Deprecated
1113        public Writer(FileSystem fs, Configuration conf, Path name, 
1114                      Class keyClass, Class valClass,
1115                      Progressable progress, Metadata metadata) throws IOException {
1116          this.compress = CompressionType.NONE;
1117          init(conf, fs.create(name, progress), true, keyClass, valClass,
1118               null, metadata);
1119        }
1120        
1121        /** Create the named file with write-progress reporter. 
1122         * @deprecated Use 
1123         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1124         *   instead.
1125         */
1126        @Deprecated
1127        public Writer(FileSystem fs, Configuration conf, Path name,
1128                      Class keyClass, Class valClass,
1129                      int bufferSize, short replication, long blockSize,
1130                      Progressable progress, Metadata metadata) throws IOException {
1131          this.compress = CompressionType.NONE;
1132          init(conf,
1133               fs.create(name, true, bufferSize, replication, blockSize, progress),
1134               true, keyClass, valClass, null, metadata);
1135        }
1136    
1137        boolean isCompressed() { return compress != CompressionType.NONE; }
1138        boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1139        
1140        Writer ownStream() { this.ownOutputStream = true; return this;  }
1141    
1142        /** Write and flush the file header. */
1143        private void writeFileHeader() 
1144          throws IOException {
1145          out.write(VERSION);
1146          Text.writeString(out, keyClass.getName());
1147          Text.writeString(out, valClass.getName());
1148          
1149          out.writeBoolean(this.isCompressed());
1150          out.writeBoolean(this.isBlockCompressed());
1151          
1152          if (this.isCompressed()) {
1153            Text.writeString(out, (codec.getClass()).getName());
1154          }
1155          this.metadata.write(out);
1156          out.write(sync);                       // write the sync bytes
1157          out.flush();                           // flush header
1158        }
1159        
1160        /** Initialize. */
1161        @SuppressWarnings("unchecked")
1162        void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1163                  Class keyClass, Class valClass,
1164                  CompressionCodec codec, Metadata metadata) 
1165          throws IOException {
1166          this.conf = conf;
1167          this.out = out;
1168          this.ownOutputStream = ownStream;
1169          this.keyClass = keyClass;
1170          this.valClass = valClass;
1171          this.codec = codec;
1172          this.metadata = metadata;
1173          SerializationFactory serializationFactory = new SerializationFactory(conf);
1174          this.keySerializer = serializationFactory.getSerializer(keyClass);
1175          if (this.keySerializer == null) {
1176            throw new IOException(
1177                "Could not find a serializer for the Key class: '"
1178                    + keyClass.getCanonicalName() + "'. "
1179                    + "Please ensure that the configuration '" +
1180                    CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1181                    + "properly configured, if you're using"
1182                    + "custom serialization.");
1183          }
1184          this.keySerializer.open(buffer);
1185          this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1186          if (this.uncompressedValSerializer == null) {
1187            throw new IOException(
1188                "Could not find a serializer for the Value class: '"
1189                    + valClass.getCanonicalName() + "'. "
1190                    + "Please ensure that the configuration '" +
1191                    CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1192                    + "properly configured, if you're using"
1193                    + "custom serialization.");
1194          }
1195          this.uncompressedValSerializer.open(buffer);
1196          if (this.codec != null) {
1197            ReflectionUtils.setConf(this.codec, this.conf);
1198            this.compressor = CodecPool.getCompressor(this.codec);
1199            this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1200            this.deflateOut = 
1201              new DataOutputStream(new BufferedOutputStream(deflateFilter));
1202            this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1203            if (this.compressedValSerializer == null) {
1204              throw new IOException(
1205                  "Could not find a serializer for the Value class: '"
1206                      + valClass.getCanonicalName() + "'. "
1207                      + "Please ensure that the configuration '" +
1208                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1209                      + "properly configured, if you're using"
1210                      + "custom serialization.");
1211            }
1212            this.compressedValSerializer.open(deflateOut);
1213          }
1214          writeFileHeader();
1215        }
1216        
1217        /** Returns the class of keys in this file. */
1218        public Class getKeyClass() { return keyClass; }
1219    
1220        /** Returns the class of values in this file. */
1221        public Class getValueClass() { return valClass; }
1222    
1223        /** Returns the compression codec of data in this file. */
1224        public CompressionCodec getCompressionCodec() { return codec; }
1225        
1226        /** create a sync point */
1227        public void sync() throws IOException {
1228          if (sync != null && lastSyncPos != out.getPos()) {
1229            out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1230            out.write(sync);                          // write sync
1231            lastSyncPos = out.getPos();               // update lastSyncPos
1232          }
1233        }
1234    
1235        /**
1236         * flush all currently written data to the file system
1237         * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1238         */
1239        @Deprecated
1240        public void syncFs() throws IOException {
1241          if (out != null) {
1242            out.sync();                               // flush contents to file system
1243          }
1244        }
1245    
1246        @Override
1247        public void hsync() throws IOException {
1248          if (out != null) {
1249            out.hsync();
1250          }
1251        }
1252    
1253        @Override
1254        public void hflush() throws IOException {
1255          if (out != null) {
1256            out.hflush();
1257          }
1258        }
1259        
1260        /** Returns the configuration of this file. */
1261        Configuration getConf() { return conf; }
1262        
1263        /** Close the file. */
1264        @Override
1265        public synchronized void close() throws IOException {
1266          keySerializer.close();
1267          uncompressedValSerializer.close();
1268          if (compressedValSerializer != null) {
1269            compressedValSerializer.close();
1270          }
1271    
1272          CodecPool.returnCompressor(compressor);
1273          compressor = null;
1274          
1275          if (out != null) {
1276            
1277            // Close the underlying stream iff we own it...
1278            if (ownOutputStream) {
1279              out.close();
1280            } else {
1281              out.flush();
1282            }
1283            out = null;
1284          }
1285        }
1286    
1287        synchronized void checkAndWriteSync() throws IOException {
1288          if (sync != null &&
1289              out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1290            sync();
1291          }
1292        }
1293    
1294        /** Append a key/value pair. */
1295        public void append(Writable key, Writable val)
1296          throws IOException {
1297          append((Object) key, (Object) val);
1298        }
1299    
1300        /** Append a key/value pair. */
1301        @SuppressWarnings("unchecked")
1302        public synchronized void append(Object key, Object val)
1303          throws IOException {
1304          if (key.getClass() != keyClass)
1305            throw new IOException("wrong key class: "+key.getClass().getName()
1306                                  +" is not "+keyClass);
1307          if (val.getClass() != valClass)
1308            throw new IOException("wrong value class: "+val.getClass().getName()
1309                                  +" is not "+valClass);
1310    
1311          buffer.reset();
1312    
1313          // Append the 'key'
1314          keySerializer.serialize(key);
1315          int keyLength = buffer.getLength();
1316          if (keyLength < 0)
1317            throw new IOException("negative length keys not allowed: " + key);
1318    
1319          // Append the 'value'
1320          if (compress == CompressionType.RECORD) {
1321            deflateFilter.resetState();
1322            compressedValSerializer.serialize(val);
1323            deflateOut.flush();
1324            deflateFilter.finish();
1325          } else {
1326            uncompressedValSerializer.serialize(val);
1327          }
1328    
1329          // Write the record out
1330          checkAndWriteSync();                                // sync
1331          out.writeInt(buffer.getLength());                   // total record length
1332          out.writeInt(keyLength);                            // key portion length
1333          out.write(buffer.getData(), 0, buffer.getLength()); // data
1334        }
1335    
1336        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1337            int keyLength, ValueBytes val) throws IOException {
1338          if (keyLength < 0)
1339            throw new IOException("negative length keys not allowed: " + keyLength);
1340    
1341          int valLength = val.getSize();
1342    
1343          checkAndWriteSync();
1344          
1345          out.writeInt(keyLength+valLength);          // total record length
1346          out.writeInt(keyLength);                    // key portion length
1347          out.write(keyData, keyOffset, keyLength);   // key
1348          val.writeUncompressedBytes(out);            // value
1349        }
1350    
1351        /** Returns the current length of the output file.
1352         *
1353         * <p>This always returns a synchronized position.  In other words,
1354         * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1355         * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1356         * the key may be earlier in the file than key last written when this
1357         * method was called (e.g., with block-compression, it may be the first key
1358         * in the block that was being written when this method was called).
1359         */
1360        public synchronized long getLength() throws IOException {
1361          return out.getPos();
1362        }
1363    
1364      } // class Writer
1365    
1366      /** Write key/compressed-value pairs to a sequence-format file. */
1367      static class RecordCompressWriter extends Writer {
1368        
1369        RecordCompressWriter(Configuration conf, 
1370                             Option... options) throws IOException {
1371          super(conf, options);
1372        }
1373    
1374        /** Append a key/value pair. */
1375        @Override
1376        @SuppressWarnings("unchecked")
1377        public synchronized void append(Object key, Object val)
1378          throws IOException {
1379          if (key.getClass() != keyClass)
1380            throw new IOException("wrong key class: "+key.getClass().getName()
1381                                  +" is not "+keyClass);
1382          if (val.getClass() != valClass)
1383            throw new IOException("wrong value class: "+val.getClass().getName()
1384                                  +" is not "+valClass);
1385    
1386          buffer.reset();
1387    
1388          // Append the 'key'
1389          keySerializer.serialize(key);
1390          int keyLength = buffer.getLength();
1391          if (keyLength < 0)
1392            throw new IOException("negative length keys not allowed: " + key);
1393    
1394          // Compress 'value' and append it
1395          deflateFilter.resetState();
1396          compressedValSerializer.serialize(val);
1397          deflateOut.flush();
1398          deflateFilter.finish();
1399    
1400          // Write the record out
1401          checkAndWriteSync();                                // sync
1402          out.writeInt(buffer.getLength());                   // total record length
1403          out.writeInt(keyLength);                            // key portion length
1404          out.write(buffer.getData(), 0, buffer.getLength()); // data
1405        }
1406    
1407        /** Append a key/value pair. */
1408        @Override
1409        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1410            int keyLength, ValueBytes val) throws IOException {
1411    
1412          if (keyLength < 0)
1413            throw new IOException("negative length keys not allowed: " + keyLength);
1414    
1415          int valLength = val.getSize();
1416          
1417          checkAndWriteSync();                        // sync
1418          out.writeInt(keyLength+valLength);          // total record length
1419          out.writeInt(keyLength);                    // key portion length
1420          out.write(keyData, keyOffset, keyLength);   // 'key' data
1421          val.writeCompressedBytes(out);              // 'value' data
1422        }
1423        
1424      } // RecordCompressionWriter
1425    
1426      /** Write compressed key/value blocks to a sequence-format file. */
1427      static class BlockCompressWriter extends Writer {
1428        
1429        private int noBufferedRecords = 0;
1430        
1431        private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1432        private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1433    
1434        private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1435        private DataOutputBuffer valBuffer = new DataOutputBuffer();
1436    
1437        private final int compressionBlockSize;
1438        
1439        BlockCompressWriter(Configuration conf,
1440                            Option... options) throws IOException {
1441          super(conf, options);
1442          compressionBlockSize = 
1443            conf.getInt("io.seqfile.compress.blocksize", 1000000);
1444          keySerializer.close();
1445          keySerializer.open(keyBuffer);
1446          uncompressedValSerializer.close();
1447          uncompressedValSerializer.open(valBuffer);
1448        }
1449    
1450        /** Workhorse to check and write out compressed data/lengths */
1451        private synchronized 
1452          void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1453          throws IOException {
1454          deflateFilter.resetState();
1455          buffer.reset();
1456          deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1457                           uncompressedDataBuffer.getLength());
1458          deflateOut.flush();
1459          deflateFilter.finish();
1460          
1461          WritableUtils.writeVInt(out, buffer.getLength());
1462          out.write(buffer.getData(), 0, buffer.getLength());
1463        }
1464        
1465        /** Compress and flush contents to dfs */
1466        @Override
1467        public synchronized void sync() throws IOException {
1468          if (noBufferedRecords > 0) {
1469            super.sync();
1470            
1471            // No. of records
1472            WritableUtils.writeVInt(out, noBufferedRecords);
1473            
1474            // Write 'keys' and lengths
1475            writeBuffer(keyLenBuffer);
1476            writeBuffer(keyBuffer);
1477            
1478            // Write 'values' and lengths
1479            writeBuffer(valLenBuffer);
1480            writeBuffer(valBuffer);
1481            
1482            // Flush the file-stream
1483            out.flush();
1484            
1485            // Reset internal states
1486            keyLenBuffer.reset();
1487            keyBuffer.reset();
1488            valLenBuffer.reset();
1489            valBuffer.reset();
1490            noBufferedRecords = 0;
1491          }
1492          
1493        }
1494        
1495        /** Close the file. */
1496        @Override
1497        public synchronized void close() throws IOException {
1498          if (out != null) {
1499            sync();
1500          }
1501          super.close();
1502        }
1503    
1504        /** Append a key/value pair. */
1505        @Override
1506        @SuppressWarnings("unchecked")
1507        public synchronized void append(Object key, Object val)
1508          throws IOException {
1509          if (key.getClass() != keyClass)
1510            throw new IOException("wrong key class: "+key+" is not "+keyClass);
1511          if (val.getClass() != valClass)
1512            throw new IOException("wrong value class: "+val+" is not "+valClass);
1513    
1514          // Save key/value into respective buffers 
1515          int oldKeyLength = keyBuffer.getLength();
1516          keySerializer.serialize(key);
1517          int keyLength = keyBuffer.getLength() - oldKeyLength;
1518          if (keyLength < 0)
1519            throw new IOException("negative length keys not allowed: " + key);
1520          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1521    
1522          int oldValLength = valBuffer.getLength();
1523          uncompressedValSerializer.serialize(val);
1524          int valLength = valBuffer.getLength() - oldValLength;
1525          WritableUtils.writeVInt(valLenBuffer, valLength);
1526          
1527          // Added another key/value pair
1528          ++noBufferedRecords;
1529          
1530          // Compress and flush?
1531          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1532          if (currentBlockSize >= compressionBlockSize) {
1533            sync();
1534          }
1535        }
1536        
1537        /** Append a key/value pair. */
1538        @Override
1539        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1540            int keyLength, ValueBytes val) throws IOException {
1541          
1542          if (keyLength < 0)
1543            throw new IOException("negative length keys not allowed");
1544    
1545          int valLength = val.getSize();
1546          
1547          // Save key/value data in relevant buffers
1548          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1549          keyBuffer.write(keyData, keyOffset, keyLength);
1550          WritableUtils.writeVInt(valLenBuffer, valLength);
1551          val.writeUncompressedBytes(valBuffer);
1552    
1553          // Added another key/value pair
1554          ++noBufferedRecords;
1555    
1556          // Compress and flush?
1557          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1558          if (currentBlockSize >= compressionBlockSize) {
1559            sync();
1560          }
1561        }
1562      
1563      } // BlockCompressionWriter
1564    
1565      /** Get the configured buffer size */
1566      private static int getBufferSize(Configuration conf) {
1567        return conf.getInt("io.file.buffer.size", 4096);
1568      }
1569    
1570      /** Reads key/value pairs from a sequence-format file. */
1571      public static class Reader implements java.io.Closeable {
1572        private String filename;
1573        private FSDataInputStream in;
1574        private DataOutputBuffer outBuf = new DataOutputBuffer();
1575    
1576        private byte version;
1577    
1578        private String keyClassName;
1579        private String valClassName;
1580        private Class keyClass;
1581        private Class valClass;
1582    
1583        private CompressionCodec codec = null;
1584        private Metadata metadata = null;
1585        
1586        private byte[] sync = new byte[SYNC_HASH_SIZE];
1587        private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1588        private boolean syncSeen;
1589    
1590        private long headerEnd;
1591        private long end;
1592        private int keyLength;
1593        private int recordLength;
1594    
1595        private boolean decompress;
1596        private boolean blockCompressed;
1597        
1598        private Configuration conf;
1599    
1600        private int noBufferedRecords = 0;
1601        private boolean lazyDecompress = true;
1602        private boolean valuesDecompressed = true;
1603        
1604        private int noBufferedKeys = 0;
1605        private int noBufferedValues = 0;
1606        
1607        private DataInputBuffer keyLenBuffer = null;
1608        private CompressionInputStream keyLenInFilter = null;
1609        private DataInputStream keyLenIn = null;
1610        private Decompressor keyLenDecompressor = null;
1611        private DataInputBuffer keyBuffer = null;
1612        private CompressionInputStream keyInFilter = null;
1613        private DataInputStream keyIn = null;
1614        private Decompressor keyDecompressor = null;
1615    
1616        private DataInputBuffer valLenBuffer = null;
1617        private CompressionInputStream valLenInFilter = null;
1618        private DataInputStream valLenIn = null;
1619        private Decompressor valLenDecompressor = null;
1620        private DataInputBuffer valBuffer = null;
1621        private CompressionInputStream valInFilter = null;
1622        private DataInputStream valIn = null;
1623        private Decompressor valDecompressor = null;
1624        
1625        private Deserializer keyDeserializer;
1626        private Deserializer valDeserializer;
1627    
1628        /**
1629         * A tag interface for all of the Reader options
1630         */
1631        public static interface Option {}
1632        
1633        /**
1634         * Create an option to specify the path name of the sequence file.
1635         * @param value the path to read
1636         * @return a new option
1637         */
1638        public static Option file(Path value) {
1639          return new FileOption(value);
1640        }
1641        
1642        /**
1643         * Create an option to specify the stream with the sequence file.
1644         * @param value the stream to read.
1645         * @return a new option
1646         */
1647        public static Option stream(FSDataInputStream value) {
1648          return new InputStreamOption(value);
1649        }
1650        
1651        /**
1652         * Create an option to specify the starting byte to read.
1653         * @param value the number of bytes to skip over
1654         * @return a new option
1655         */
1656        public static Option start(long value) {
1657          return new StartOption(value);
1658        }
1659        
1660        /**
1661         * Create an option to specify the number of bytes to read.
1662         * @param value the number of bytes to read
1663         * @return a new option
1664         */
1665        public static Option length(long value) {
1666          return new LengthOption(value);
1667        }
1668        
1669        /**
1670         * Create an option with the buffer size for reading the given pathname.
1671         * @param value the number of bytes to buffer
1672         * @return a new option
1673         */
1674        public static Option bufferSize(int value) {
1675          return new BufferSizeOption(value);
1676        }
1677    
1678        private static class FileOption extends Options.PathOption 
1679                                        implements Option {
1680          private FileOption(Path value) {
1681            super(value);
1682          }
1683        }
1684        
1685        private static class InputStreamOption
1686            extends Options.FSDataInputStreamOption 
1687            implements Option {
1688          private InputStreamOption(FSDataInputStream value) {
1689            super(value);
1690          }
1691        }
1692    
1693        private static class StartOption extends Options.LongOption
1694                                         implements Option {
1695          private StartOption(long value) {
1696            super(value);
1697          }
1698        }
1699    
1700        private static class LengthOption extends Options.LongOption
1701                                          implements Option {
1702          private LengthOption(long value) {
1703            super(value);
1704          }
1705        }
1706    
1707        private static class BufferSizeOption extends Options.IntegerOption
1708                                          implements Option {
1709          private BufferSizeOption(int value) {
1710            super(value);
1711          }
1712        }
1713    
1714        // only used directly
1715        private static class OnlyHeaderOption extends Options.BooleanOption 
1716                                              implements Option {
1717          private OnlyHeaderOption() {
1718            super(true);
1719          }
1720        }
1721    
1722        public Reader(Configuration conf, Option... opts) throws IOException {
1723          // Look up the options, these are null if not set
1724          FileOption fileOpt = Options.getOption(FileOption.class, opts);
1725          InputStreamOption streamOpt = 
1726            Options.getOption(InputStreamOption.class, opts);
1727          StartOption startOpt = Options.getOption(StartOption.class, opts);
1728          LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1729          BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1730          OnlyHeaderOption headerOnly = 
1731            Options.getOption(OnlyHeaderOption.class, opts);
1732          // check for consistency
1733          if ((fileOpt == null) == (streamOpt == null)) {
1734            throw new 
1735              IllegalArgumentException("File or stream option must be specified");
1736          }
1737          if (fileOpt == null && bufOpt != null) {
1738            throw new IllegalArgumentException("buffer size can only be set when" +
1739                                               " a file is specified.");
1740          }
1741          // figure out the real values
1742          Path filename = null;
1743          FSDataInputStream file;
1744          final long len;
1745          if (fileOpt != null) {
1746            filename = fileOpt.getValue();
1747            FileSystem fs = filename.getFileSystem(conf);
1748            int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1749            len = null == lenOpt
1750              ? fs.getFileStatus(filename).getLen()
1751              : lenOpt.getValue();
1752            file = openFile(fs, filename, bufSize, len);
1753          } else {
1754            len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1755            file = streamOpt.getValue();
1756          }
1757          long start = startOpt == null ? 0 : startOpt.getValue();
1758          // really set up
1759          initialize(filename, file, start, len, conf, headerOnly != null);
1760        }
1761    
1762        /**
1763         * Construct a reader by opening a file from the given file system.
1764         * @param fs The file system used to open the file.
1765         * @param file The file being read.
1766         * @param conf Configuration
1767         * @throws IOException
1768         * @deprecated Use Reader(Configuration, Option...) instead.
1769         */
1770        @Deprecated
1771        public Reader(FileSystem fs, Path file, 
1772                      Configuration conf) throws IOException {
1773          this(conf, file(file.makeQualified(fs)));
1774        }
1775    
1776        /**
1777         * Construct a reader by the given input stream.
1778         * @param in An input stream.
1779         * @param buffersize unused
1780         * @param start The starting position.
1781         * @param length The length being read.
1782         * @param conf Configuration
1783         * @throws IOException
1784         * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1785         */
1786        @Deprecated
1787        public Reader(FSDataInputStream in, int buffersize,
1788            long start, long length, Configuration conf) throws IOException {
1789          this(conf, stream(in), start(start), length(length));
1790        }
1791    
1792        /** Common work of the constructors. */
1793        private void initialize(Path filename, FSDataInputStream in,
1794                                long start, long length, Configuration conf,
1795                                boolean tempReader) throws IOException {
1796          if (in == null) {
1797            throw new IllegalArgumentException("in == null");
1798          }
1799          this.filename = filename == null ? "<unknown>" : filename.toString();
1800          this.in = in;
1801          this.conf = conf;
1802          boolean succeeded = false;
1803          try {
1804            seek(start);
1805            this.end = this.in.getPos() + length;
1806            // if it wrapped around, use the max
1807            if (end < length) {
1808              end = Long.MAX_VALUE;
1809            }
1810            init(tempReader);
1811            succeeded = true;
1812          } finally {
1813            if (!succeeded) {
1814              IOUtils.cleanup(LOG, this.in);
1815            }
1816          }
1817        }
1818    
1819        /**
1820         * Override this method to specialize the type of
1821         * {@link FSDataInputStream} returned.
1822         * @param fs The file system used to open the file.
1823         * @param file The file being read.
1824         * @param bufferSize The buffer size used to read the file.
1825         * @param length The length being read if it is >= 0.  Otherwise,
1826         *               the length is not available.
1827         * @return The opened stream.
1828         * @throws IOException
1829         */
1830        protected FSDataInputStream openFile(FileSystem fs, Path file,
1831            int bufferSize, long length) throws IOException {
1832          return fs.open(file, bufferSize);
1833        }
1834        
1835        /**
1836         * Initialize the {@link Reader}
1837         * @param tmpReader <code>true</code> if we are constructing a temporary
1838         *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1839         *                  and hence do not initialize every component; 
1840         *                  <code>false</code> otherwise.
1841         * @throws IOException
1842         */
1843        private void init(boolean tempReader) throws IOException {
1844          byte[] versionBlock = new byte[VERSION.length];
1845          in.readFully(versionBlock);
1846    
1847          if ((versionBlock[0] != VERSION[0]) ||
1848              (versionBlock[1] != VERSION[1]) ||
1849              (versionBlock[2] != VERSION[2]))
1850            throw new IOException(this + " not a SequenceFile");
1851    
1852          // Set 'version'
1853          version = versionBlock[3];
1854          if (version > VERSION[3])
1855            throw new VersionMismatchException(VERSION[3], version);
1856    
1857          if (version < BLOCK_COMPRESS_VERSION) {
1858            UTF8 className = new UTF8();
1859    
1860            className.readFields(in);
1861            keyClassName = className.toStringChecked(); // key class name
1862    
1863            className.readFields(in);
1864            valClassName = className.toStringChecked(); // val class name
1865          } else {
1866            keyClassName = Text.readString(in);
1867            valClassName = Text.readString(in);
1868          }
1869    
1870          if (version > 2) {                          // if version > 2
1871            this.decompress = in.readBoolean();       // is compressed?
1872          } else {
1873            decompress = false;
1874          }
1875    
1876          if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1877            this.blockCompressed = in.readBoolean();  // is block-compressed?
1878          } else {
1879            blockCompressed = false;
1880          }
1881          
1882          // if version >= 5
1883          // setup the compression codec
1884          if (decompress) {
1885            if (version >= CUSTOM_COMPRESS_VERSION) {
1886              String codecClassname = Text.readString(in);
1887              try {
1888                Class<? extends CompressionCodec> codecClass
1889                  = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1890                this.codec = ReflectionUtils.newInstance(codecClass, conf);
1891              } catch (ClassNotFoundException cnfe) {
1892                throw new IllegalArgumentException("Unknown codec: " + 
1893                                                   codecClassname, cnfe);
1894              }
1895            } else {
1896              codec = new DefaultCodec();
1897              ((Configurable)codec).setConf(conf);
1898            }
1899          }
1900          
1901          this.metadata = new Metadata();
1902          if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1903            this.metadata.readFields(in);
1904          }
1905          
1906          if (version > 1) {                          // if version > 1
1907            in.readFully(sync);                       // read sync bytes
1908            headerEnd = in.getPos();                  // record end of header
1909          }
1910          
1911          // Initialize... *not* if this we are constructing a temporary Reader
1912          if (!tempReader) {
1913            valBuffer = new DataInputBuffer();
1914            if (decompress) {
1915              valDecompressor = CodecPool.getDecompressor(codec);
1916              valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1917              valIn = new DataInputStream(valInFilter);
1918            } else {
1919              valIn = valBuffer;
1920            }
1921    
1922            if (blockCompressed) {
1923              keyLenBuffer = new DataInputBuffer();
1924              keyBuffer = new DataInputBuffer();
1925              valLenBuffer = new DataInputBuffer();
1926    
1927              keyLenDecompressor = CodecPool.getDecompressor(codec);
1928              keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1929                                                       keyLenDecompressor);
1930              keyLenIn = new DataInputStream(keyLenInFilter);
1931    
1932              keyDecompressor = CodecPool.getDecompressor(codec);
1933              keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1934              keyIn = new DataInputStream(keyInFilter);
1935    
1936              valLenDecompressor = CodecPool.getDecompressor(codec);
1937              valLenInFilter = codec.createInputStream(valLenBuffer, 
1938                                                       valLenDecompressor);
1939              valLenIn = new DataInputStream(valLenInFilter);
1940            }
1941            
1942            SerializationFactory serializationFactory =
1943              new SerializationFactory(conf);
1944            this.keyDeserializer =
1945              getDeserializer(serializationFactory, getKeyClass());
1946            if (this.keyDeserializer == null) {
1947              throw new IOException(
1948                  "Could not find a deserializer for the Key class: '"
1949                      + getKeyClass().getCanonicalName() + "'. "
1950                      + "Please ensure that the configuration '" +
1951                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1952                      + "properly configured, if you're using "
1953                      + "custom serialization.");
1954            }
1955            if (!blockCompressed) {
1956              this.keyDeserializer.open(valBuffer);
1957            } else {
1958              this.keyDeserializer.open(keyIn);
1959            }
1960            this.valDeserializer =
1961              getDeserializer(serializationFactory, getValueClass());
1962            if (this.valDeserializer == null) {
1963              throw new IOException(
1964                  "Could not find a deserializer for the Value class: '"
1965                      + getValueClass().getCanonicalName() + "'. "
1966                      + "Please ensure that the configuration '" +
1967                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1968                      + "properly configured, if you're using "
1969                      + "custom serialization.");
1970            }
1971            this.valDeserializer.open(valIn);
1972          }
1973        }
1974        
1975        @SuppressWarnings("unchecked")
1976        private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1977          return sf.getDeserializer(c);
1978        }
1979        
1980        /** Close the file. */
1981        @Override
1982        public synchronized void close() throws IOException {
1983          // Return the decompressors to the pool
1984          CodecPool.returnDecompressor(keyLenDecompressor);
1985          CodecPool.returnDecompressor(keyDecompressor);
1986          CodecPool.returnDecompressor(valLenDecompressor);
1987          CodecPool.returnDecompressor(valDecompressor);
1988          keyLenDecompressor = keyDecompressor = null;
1989          valLenDecompressor = valDecompressor = null;
1990          
1991          if (keyDeserializer != null) {
1992            keyDeserializer.close();
1993          }
1994          if (valDeserializer != null) {
1995            valDeserializer.close();
1996          }
1997          
1998          // Close the input-stream
1999          in.close();
2000        }
2001    
2002        /** Returns the name of the key class. */
2003        public String getKeyClassName() {
2004          return keyClassName;
2005        }
2006    
2007        /** Returns the class of keys in this file. */
2008        public synchronized Class<?> getKeyClass() {
2009          if (null == keyClass) {
2010            try {
2011              keyClass = WritableName.getClass(getKeyClassName(), conf);
2012            } catch (IOException e) {
2013              throw new RuntimeException(e);
2014            }
2015          }
2016          return keyClass;
2017        }
2018    
2019        /** Returns the name of the value class. */
2020        public String getValueClassName() {
2021          return valClassName;
2022        }
2023    
2024        /** Returns the class of values in this file. */
2025        public synchronized Class<?> getValueClass() {
2026          if (null == valClass) {
2027            try {
2028              valClass = WritableName.getClass(getValueClassName(), conf);
2029            } catch (IOException e) {
2030              throw new RuntimeException(e);
2031            }
2032          }
2033          return valClass;
2034        }
2035    
2036        /** Returns true if values are compressed. */
2037        public boolean isCompressed() { return decompress; }
2038        
2039        /** Returns true if records are block-compressed. */
2040        public boolean isBlockCompressed() { return blockCompressed; }
2041        
2042        /** Returns the compression codec of data in this file. */
2043        public CompressionCodec getCompressionCodec() { return codec; }
2044        
2045        /**
2046         * Get the compression type for this file.
2047         * @return the compression type
2048         */
2049        public CompressionType getCompressionType() {
2050          if (decompress) {
2051            return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2052          } else {
2053            return CompressionType.NONE;
2054          }
2055        }
2056    
2057        /** Returns the metadata object of the file */
2058        public Metadata getMetadata() {
2059          return this.metadata;
2060        }
2061        
2062        /** Returns the configuration used for this file. */
2063        Configuration getConf() { return conf; }
2064        
2065        /** Read a compressed buffer */
2066        private synchronized void readBuffer(DataInputBuffer buffer, 
2067                                             CompressionInputStream filter) throws IOException {
2068          // Read data into a temporary buffer
2069          DataOutputBuffer dataBuffer = new DataOutputBuffer();
2070    
2071          try {
2072            int dataBufferLength = WritableUtils.readVInt(in);
2073            dataBuffer.write(in, dataBufferLength);
2074          
2075            // Set up 'buffer' connected to the input-stream
2076            buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2077          } finally {
2078            dataBuffer.close();
2079          }
2080    
2081          // Reset the codec
2082          filter.resetState();
2083        }
2084        
2085        /** Read the next 'compressed' block */
2086        private synchronized void readBlock() throws IOException {
2087          // Check if we need to throw away a whole block of 
2088          // 'values' due to 'lazy decompression' 
2089          if (lazyDecompress && !valuesDecompressed) {
2090            in.seek(WritableUtils.readVInt(in)+in.getPos());
2091            in.seek(WritableUtils.readVInt(in)+in.getPos());
2092          }
2093          
2094          // Reset internal states
2095          noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2096          valuesDecompressed = false;
2097    
2098          //Process sync
2099          if (sync != null) {
2100            in.readInt();
2101            in.readFully(syncCheck);                // read syncCheck
2102            if (!Arrays.equals(sync, syncCheck))    // check it
2103              throw new IOException("File is corrupt!");
2104          }
2105          syncSeen = true;
2106    
2107          // Read number of records in this block
2108          noBufferedRecords = WritableUtils.readVInt(in);
2109          
2110          // Read key lengths and keys
2111          readBuffer(keyLenBuffer, keyLenInFilter);
2112          readBuffer(keyBuffer, keyInFilter);
2113          noBufferedKeys = noBufferedRecords;
2114          
2115          // Read value lengths and values
2116          if (!lazyDecompress) {
2117            readBuffer(valLenBuffer, valLenInFilter);
2118            readBuffer(valBuffer, valInFilter);
2119            noBufferedValues = noBufferedRecords;
2120            valuesDecompressed = true;
2121          }
2122        }
2123    
2124        /** 
2125         * Position valLenIn/valIn to the 'value' 
2126         * corresponding to the 'current' key 
2127         */
2128        private synchronized void seekToCurrentValue() throws IOException {
2129          if (!blockCompressed) {
2130            if (decompress) {
2131              valInFilter.resetState();
2132            }
2133            valBuffer.reset();
2134          } else {
2135            // Check if this is the first value in the 'block' to be read
2136            if (lazyDecompress && !valuesDecompressed) {
2137              // Read the value lengths and values
2138              readBuffer(valLenBuffer, valLenInFilter);
2139              readBuffer(valBuffer, valInFilter);
2140              noBufferedValues = noBufferedRecords;
2141              valuesDecompressed = true;
2142            }
2143            
2144            // Calculate the no. of bytes to skip
2145            // Note: 'current' key has already been read!
2146            int skipValBytes = 0;
2147            int currentKey = noBufferedKeys + 1;          
2148            for (int i=noBufferedValues; i > currentKey; --i) {
2149              skipValBytes += WritableUtils.readVInt(valLenIn);
2150              --noBufferedValues;
2151            }
2152            
2153            // Skip to the 'val' corresponding to 'current' key
2154            if (skipValBytes > 0) {
2155              if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2156                throw new IOException("Failed to seek to " + currentKey + 
2157                                      "(th) value!");
2158              }
2159            }
2160          }
2161        }
2162    
2163        /**
2164         * Get the 'value' corresponding to the last read 'key'.
2165         * @param val : The 'value' to be read.
2166         * @throws IOException
2167         */
2168        public synchronized void getCurrentValue(Writable val) 
2169          throws IOException {
2170          if (val instanceof Configurable) {
2171            ((Configurable) val).setConf(this.conf);
2172          }
2173    
2174          // Position stream to 'current' value
2175          seekToCurrentValue();
2176    
2177          if (!blockCompressed) {
2178            val.readFields(valIn);
2179            
2180            if (valIn.read() > 0) {
2181              LOG.info("available bytes: " + valIn.available());
2182              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2183                                    + " bytes, should read " +
2184                                    (valBuffer.getLength()-keyLength));
2185            }
2186          } else {
2187            // Get the value
2188            int valLength = WritableUtils.readVInt(valLenIn);
2189            val.readFields(valIn);
2190            
2191            // Read another compressed 'value'
2192            --noBufferedValues;
2193            
2194            // Sanity check
2195            if ((valLength < 0) && LOG.isDebugEnabled()) {
2196              LOG.debug(val + " is a zero-length value");
2197            }
2198          }
2199    
2200        }
2201        
2202        /**
2203         * Get the 'value' corresponding to the last read 'key'.
2204         * @param val : The 'value' to be read.
2205         * @throws IOException
2206         */
2207        public synchronized Object getCurrentValue(Object val) 
2208          throws IOException {
2209          if (val instanceof Configurable) {
2210            ((Configurable) val).setConf(this.conf);
2211          }
2212    
2213          // Position stream to 'current' value
2214          seekToCurrentValue();
2215    
2216          if (!blockCompressed) {
2217            val = deserializeValue(val);
2218            
2219            if (valIn.read() > 0) {
2220              LOG.info("available bytes: " + valIn.available());
2221              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2222                                    + " bytes, should read " +
2223                                    (valBuffer.getLength()-keyLength));
2224            }
2225          } else {
2226            // Get the value
2227            int valLength = WritableUtils.readVInt(valLenIn);
2228            val = deserializeValue(val);
2229            
2230            // Read another compressed 'value'
2231            --noBufferedValues;
2232            
2233            // Sanity check
2234            if ((valLength < 0) && LOG.isDebugEnabled()) {
2235              LOG.debug(val + " is a zero-length value");
2236            }
2237          }
2238          return val;
2239    
2240        }
2241    
2242        @SuppressWarnings("unchecked")
2243        private Object deserializeValue(Object val) throws IOException {
2244          return valDeserializer.deserialize(val);
2245        }
2246        
2247        /** Read the next key in the file into <code>key</code>, skipping its
2248         * value.  True if another entry exists, and false at end of file. */
2249        public synchronized boolean next(Writable key) throws IOException {
2250          if (key.getClass() != getKeyClass())
2251            throw new IOException("wrong key class: "+key.getClass().getName()
2252                                  +" is not "+keyClass);
2253    
2254          if (!blockCompressed) {
2255            outBuf.reset();
2256            
2257            keyLength = next(outBuf);
2258            if (keyLength < 0)
2259              return false;
2260            
2261            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2262            
2263            key.readFields(valBuffer);
2264            valBuffer.mark(0);
2265            if (valBuffer.getPosition() != keyLength)
2266              throw new IOException(key + " read " + valBuffer.getPosition()
2267                                    + " bytes, should read " + keyLength);
2268          } else {
2269            //Reset syncSeen
2270            syncSeen = false;
2271            
2272            if (noBufferedKeys == 0) {
2273              try {
2274                readBlock();
2275              } catch (EOFException eof) {
2276                return false;
2277              }
2278            }
2279            
2280            int keyLength = WritableUtils.readVInt(keyLenIn);
2281            
2282            // Sanity check
2283            if (keyLength < 0) {
2284              return false;
2285            }
2286            
2287            //Read another compressed 'key'
2288            key.readFields(keyIn);
2289            --noBufferedKeys;
2290          }
2291    
2292          return true;
2293        }
2294    
2295        /** Read the next key/value pair in the file into <code>key</code> and
2296         * <code>val</code>.  Returns true if such a pair exists and false when at
2297         * end of file */
2298        public synchronized boolean next(Writable key, Writable val)
2299          throws IOException {
2300          if (val.getClass() != getValueClass())
2301            throw new IOException("wrong value class: "+val+" is not "+valClass);
2302    
2303          boolean more = next(key);
2304          
2305          if (more) {
2306            getCurrentValue(val);
2307          }
2308    
2309          return more;
2310        }
2311        
2312        /**
2313         * Read and return the next record length, potentially skipping over 
2314         * a sync block.
2315         * @return the length of the next record or -1 if there is no next record
2316         * @throws IOException
2317         */
2318        private synchronized int readRecordLength() throws IOException {
2319          if (in.getPos() >= end) {
2320            return -1;
2321          }      
2322          int length = in.readInt();
2323          if (version > 1 && sync != null &&
2324              length == SYNC_ESCAPE) {              // process a sync entry
2325            in.readFully(syncCheck);                // read syncCheck
2326            if (!Arrays.equals(sync, syncCheck))    // check it
2327              throw new IOException("File is corrupt!");
2328            syncSeen = true;
2329            if (in.getPos() >= end) {
2330              return -1;
2331            }
2332            length = in.readInt();                  // re-read length
2333          } else {
2334            syncSeen = false;
2335          }
2336          
2337          return length;
2338        }
2339        
2340        /** Read the next key/value pair in the file into <code>buffer</code>.
2341         * Returns the length of the key read, or -1 if at end of file.  The length
2342         * of the value may be computed by calling buffer.getLength() before and
2343         * after calls to this method. */
2344        /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2345        @Deprecated
2346        synchronized int next(DataOutputBuffer buffer) throws IOException {
2347          // Unsupported for block-compressed sequence files
2348          if (blockCompressed) {
2349            throw new IOException("Unsupported call for block-compressed" +
2350                                  " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2351          }
2352          try {
2353            int length = readRecordLength();
2354            if (length == -1) {
2355              return -1;
2356            }
2357            int keyLength = in.readInt();
2358            buffer.write(in, length);
2359            return keyLength;
2360          } catch (ChecksumException e) {             // checksum failure
2361            handleChecksumException(e);
2362            return next(buffer);
2363          }
2364        }
2365    
2366        public ValueBytes createValueBytes() {
2367          ValueBytes val = null;
2368          if (!decompress || blockCompressed) {
2369            val = new UncompressedBytes();
2370          } else {
2371            val = new CompressedBytes(codec);
2372          }
2373          return val;
2374        }
2375    
2376        /**
2377         * Read 'raw' records.
2378         * @param key - The buffer into which the key is read
2379         * @param val - The 'raw' value
2380         * @return Returns the total record length or -1 for end of file
2381         * @throws IOException
2382         */
2383        public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2384          throws IOException {
2385          if (!blockCompressed) {
2386            int length = readRecordLength();
2387            if (length == -1) {
2388              return -1;
2389            }
2390            int keyLength = in.readInt();
2391            int valLength = length - keyLength;
2392            key.write(in, keyLength);
2393            if (decompress) {
2394              CompressedBytes value = (CompressedBytes)val;
2395              value.reset(in, valLength);
2396            } else {
2397              UncompressedBytes value = (UncompressedBytes)val;
2398              value.reset(in, valLength);
2399            }
2400            
2401            return length;
2402          } else {
2403            //Reset syncSeen
2404            syncSeen = false;
2405            
2406            // Read 'key'
2407            if (noBufferedKeys == 0) {
2408              if (in.getPos() >= end) 
2409                return -1;
2410    
2411              try { 
2412                readBlock();
2413              } catch (EOFException eof) {
2414                return -1;
2415              }
2416            }
2417            int keyLength = WritableUtils.readVInt(keyLenIn);
2418            if (keyLength < 0) {
2419              throw new IOException("zero length key found!");
2420            }
2421            key.write(keyIn, keyLength);
2422            --noBufferedKeys;
2423            
2424            // Read raw 'value'
2425            seekToCurrentValue();
2426            int valLength = WritableUtils.readVInt(valLenIn);
2427            UncompressedBytes rawValue = (UncompressedBytes)val;
2428            rawValue.reset(valIn, valLength);
2429            --noBufferedValues;
2430            
2431            return (keyLength+valLength);
2432          }
2433          
2434        }
2435    
2436        /**
2437         * Read 'raw' keys.
2438         * @param key - The buffer into which the key is read
2439         * @return Returns the key length or -1 for end of file
2440         * @throws IOException
2441         */
2442        public synchronized int nextRawKey(DataOutputBuffer key) 
2443          throws IOException {
2444          if (!blockCompressed) {
2445            recordLength = readRecordLength();
2446            if (recordLength == -1) {
2447              return -1;
2448            }
2449            keyLength = in.readInt();
2450            key.write(in, keyLength);
2451            return keyLength;
2452          } else {
2453            //Reset syncSeen
2454            syncSeen = false;
2455            
2456            // Read 'key'
2457            if (noBufferedKeys == 0) {
2458              if (in.getPos() >= end) 
2459                return -1;
2460    
2461              try { 
2462                readBlock();
2463              } catch (EOFException eof) {
2464                return -1;
2465              }
2466            }
2467            int keyLength = WritableUtils.readVInt(keyLenIn);
2468            if (keyLength < 0) {
2469              throw new IOException("zero length key found!");
2470            }
2471            key.write(keyIn, keyLength);
2472            --noBufferedKeys;
2473            
2474            return keyLength;
2475          }
2476          
2477        }
2478    
2479        /** Read the next key in the file, skipping its
2480         * value.  Return null at end of file. */
2481        public synchronized Object next(Object key) throws IOException {
2482          if (key != null && key.getClass() != getKeyClass()) {
2483            throw new IOException("wrong key class: "+key.getClass().getName()
2484                                  +" is not "+keyClass);
2485          }
2486    
2487          if (!blockCompressed) {
2488            outBuf.reset();
2489            
2490            keyLength = next(outBuf);
2491            if (keyLength < 0)
2492              return null;
2493            
2494            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2495            
2496            key = deserializeKey(key);
2497            valBuffer.mark(0);
2498            if (valBuffer.getPosition() != keyLength)
2499              throw new IOException(key + " read " + valBuffer.getPosition()
2500                                    + " bytes, should read " + keyLength);
2501          } else {
2502            //Reset syncSeen
2503            syncSeen = false;
2504            
2505            if (noBufferedKeys == 0) {
2506              try {
2507                readBlock();
2508              } catch (EOFException eof) {
2509                return null;
2510              }
2511            }
2512            
2513            int keyLength = WritableUtils.readVInt(keyLenIn);
2514            
2515            // Sanity check
2516            if (keyLength < 0) {
2517              return null;
2518            }
2519            
2520            //Read another compressed 'key'
2521            key = deserializeKey(key);
2522            --noBufferedKeys;
2523          }
2524    
2525          return key;
2526        }
2527    
2528        @SuppressWarnings("unchecked")
2529        private Object deserializeKey(Object key) throws IOException {
2530          return keyDeserializer.deserialize(key);
2531        }
2532    
2533        /**
2534         * Read 'raw' values.
2535         * @param val - The 'raw' value
2536         * @return Returns the value length
2537         * @throws IOException
2538         */
2539        public synchronized int nextRawValue(ValueBytes val) 
2540          throws IOException {
2541          
2542          // Position stream to current value
2543          seekToCurrentValue();
2544     
2545          if (!blockCompressed) {
2546            int valLength = recordLength - keyLength;
2547            if (decompress) {
2548              CompressedBytes value = (CompressedBytes)val;
2549              value.reset(in, valLength);
2550            } else {
2551              UncompressedBytes value = (UncompressedBytes)val;
2552              value.reset(in, valLength);
2553            }
2554             
2555            return valLength;
2556          } else {
2557            int valLength = WritableUtils.readVInt(valLenIn);
2558            UncompressedBytes rawValue = (UncompressedBytes)val;
2559            rawValue.reset(valIn, valLength);
2560            --noBufferedValues;
2561            return valLength;
2562          }
2563          
2564        }
2565    
2566        private void handleChecksumException(ChecksumException e)
2567          throws IOException {
2568          if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2569            LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2570            sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2571          } else {
2572            throw e;
2573          }
2574        }
2575    
2576        /** disables sync. often invoked for tmp files */
2577        synchronized void ignoreSync() {
2578          sync = null;
2579        }
2580        
2581        /** Set the current byte position in the input file.
2582         *
2583         * <p>The position passed must be a position returned by {@link
2584         * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2585         * position, use {@link SequenceFile.Reader#sync(long)}.
2586         */
2587        public synchronized void seek(long position) throws IOException {
2588          in.seek(position);
2589          if (blockCompressed) {                      // trigger block read
2590            noBufferedKeys = 0;
2591            valuesDecompressed = true;
2592          }
2593        }
2594    
2595        /** Seek to the next sync mark past a given position.*/
2596        public synchronized void sync(long position) throws IOException {
2597          if (position+SYNC_SIZE >= end) {
2598            seek(end);
2599            return;
2600          }
2601    
2602          if (position < headerEnd) {
2603            // seek directly to first record
2604            in.seek(headerEnd);
2605            // note the sync marker "seen" in the header
2606            syncSeen = true;
2607            return;
2608          }
2609    
2610          try {
2611            seek(position+4);                         // skip escape
2612            in.readFully(syncCheck);
2613            int syncLen = sync.length;
2614            for (int i = 0; in.getPos() < end; i++) {
2615              int j = 0;
2616              for (; j < syncLen; j++) {
2617                if (sync[j] != syncCheck[(i+j)%syncLen])
2618                  break;
2619              }
2620              if (j == syncLen) {
2621                in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2622                return;
2623              }
2624              syncCheck[i%syncLen] = in.readByte();
2625            }
2626          } catch (ChecksumException e) {             // checksum failure
2627            handleChecksumException(e);
2628          }
2629        }
2630    
2631        /** Returns true iff the previous call to next passed a sync mark.*/
2632        public synchronized boolean syncSeen() { return syncSeen; }
2633    
2634        /** Return the current byte position in the input file. */
2635        public synchronized long getPosition() throws IOException {
2636          return in.getPos();
2637        }
2638    
2639        /** Returns the name of the file. */
2640        @Override
2641        public String toString() {
2642          return filename;
2643        }
2644    
2645      }
2646    
2647      /** Sorts key/value pairs in a sequence-format file.
2648       *
2649       * <p>For best performance, applications should make sure that the {@link
2650       * Writable#readFields(DataInput)} implementation of their keys is
2651       * very efficient.  In particular, it should avoid allocating memory.
2652       */
2653      public static class Sorter {
2654    
2655        private RawComparator comparator;
2656    
2657        private MergeSort mergeSort; //the implementation of merge sort
2658        
2659        private Path[] inFiles;                     // when merging or sorting
2660    
2661        private Path outFile;
2662    
2663        private int memory; // bytes
2664        private int factor; // merged per pass
2665    
2666        private FileSystem fs = null;
2667    
2668        private Class keyClass;
2669        private Class valClass;
2670    
2671        private Configuration conf;
2672        private Metadata metadata;
2673        
2674        private Progressable progressable = null;
2675    
2676        /** Sort and merge files containing the named classes. */
2677        public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2678                      Class valClass, Configuration conf)  {
2679          this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
2680        }
2681    
2682        /** Sort and merge using an arbitrary {@link RawComparator}. */
2683        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2684                      Class valClass, Configuration conf) {
2685          this(fs, comparator, keyClass, valClass, conf, new Metadata());
2686        }
2687    
2688        /** Sort and merge using an arbitrary {@link RawComparator}. */
2689        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2690                      Class valClass, Configuration conf, Metadata metadata) {
2691          this.fs = fs;
2692          this.comparator = comparator;
2693          this.keyClass = keyClass;
2694          this.valClass = valClass;
2695          this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2696          this.factor = conf.getInt("io.sort.factor", 100);
2697          this.conf = conf;
2698          this.metadata = metadata;
2699        }
2700    
2701        /** Set the number of streams to merge at once.*/
2702        public void setFactor(int factor) { this.factor = factor; }
2703    
2704        /** Get the number of streams to merge at once.*/
2705        public int getFactor() { return factor; }
2706    
2707        /** Set the total amount of buffer memory, in bytes.*/
2708        public void setMemory(int memory) { this.memory = memory; }
2709    
2710        /** Get the total amount of buffer memory, in bytes.*/
2711        public int getMemory() { return memory; }
2712    
2713        /** Set the progressable object in order to report progress. */
2714        public void setProgressable(Progressable progressable) {
2715          this.progressable = progressable;
2716        }
2717        
2718        /** 
2719         * Perform a file sort from a set of input files into an output file.
2720         * @param inFiles the files to be sorted
2721         * @param outFile the sorted output file
2722         * @param deleteInput should the input files be deleted as they are read?
2723         */
2724        public void sort(Path[] inFiles, Path outFile,
2725                         boolean deleteInput) throws IOException {
2726          if (fs.exists(outFile)) {
2727            throw new IOException("already exists: " + outFile);
2728          }
2729    
2730          this.inFiles = inFiles;
2731          this.outFile = outFile;
2732    
2733          int segments = sortPass(deleteInput);
2734          if (segments > 1) {
2735            mergePass(outFile.getParent());
2736          }
2737        }
2738    
2739        /** 
2740         * Perform a file sort from a set of input files and return an iterator.
2741         * @param inFiles the files to be sorted
2742         * @param tempDir the directory where temp files are created during sort
2743         * @param deleteInput should the input files be deleted as they are read?
2744         * @return iterator the RawKeyValueIterator
2745         */
2746        public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2747                                                  boolean deleteInput) throws IOException {
2748          Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2749          if (fs.exists(outFile)) {
2750            throw new IOException("already exists: " + outFile);
2751          }
2752          this.inFiles = inFiles;
2753          //outFile will basically be used as prefix for temp files in the cases
2754          //where sort outputs multiple sorted segments. For the single segment
2755          //case, the outputFile itself will contain the sorted data for that
2756          //segment
2757          this.outFile = outFile;
2758    
2759          int segments = sortPass(deleteInput);
2760          if (segments > 1)
2761            return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2762                         tempDir);
2763          else if (segments == 1)
2764            return merge(new Path[]{outFile}, true, tempDir);
2765          else return null;
2766        }
2767    
2768        /**
2769         * The backwards compatible interface to sort.
2770         * @param inFile the input file to sort
2771         * @param outFile the sorted output file
2772         */
2773        public void sort(Path inFile, Path outFile) throws IOException {
2774          sort(new Path[]{inFile}, outFile, false);
2775        }
2776        
2777        private int sortPass(boolean deleteInput) throws IOException {
2778          if(LOG.isDebugEnabled()) {
2779            LOG.debug("running sort pass");
2780          }
2781          SortPass sortPass = new SortPass();         // make the SortPass
2782          sortPass.setProgressable(progressable);
2783          mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2784          try {
2785            return sortPass.run(deleteInput);         // run it
2786          } finally {
2787            sortPass.close();                         // close it
2788          }
2789        }
2790    
2791        private class SortPass {
2792          private int memoryLimit = memory/4;
2793          private int recordLimit = 1000000;
2794          
2795          private DataOutputBuffer rawKeys = new DataOutputBuffer();
2796          private byte[] rawBuffer;
2797    
2798          private int[] keyOffsets = new int[1024];
2799          private int[] pointers = new int[keyOffsets.length];
2800          private int[] pointersCopy = new int[keyOffsets.length];
2801          private int[] keyLengths = new int[keyOffsets.length];
2802          private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2803          
2804          private ArrayList segmentLengths = new ArrayList();
2805          
2806          private Reader in = null;
2807          private FSDataOutputStream out = null;
2808          private FSDataOutputStream indexOut = null;
2809          private Path outName;
2810    
2811          private Progressable progressable = null;
2812    
2813          public int run(boolean deleteInput) throws IOException {
2814            int segments = 0;
2815            int currentFile = 0;
2816            boolean atEof = (currentFile >= inFiles.length);
2817            CompressionType compressionType;
2818            CompressionCodec codec = null;
2819            segmentLengths.clear();
2820            if (atEof) {
2821              return 0;
2822            }
2823            
2824            // Initialize
2825            in = new Reader(fs, inFiles[currentFile], conf);
2826            compressionType = in.getCompressionType();
2827            codec = in.getCompressionCodec();
2828            
2829            for (int i=0; i < rawValues.length; ++i) {
2830              rawValues[i] = null;
2831            }
2832            
2833            while (!atEof) {
2834              int count = 0;
2835              int bytesProcessed = 0;
2836              rawKeys.reset();
2837              while (!atEof && 
2838                     bytesProcessed < memoryLimit && count < recordLimit) {
2839    
2840                // Read a record into buffer
2841                // Note: Attempt to re-use 'rawValue' as far as possible
2842                int keyOffset = rawKeys.getLength();       
2843                ValueBytes rawValue = 
2844                  (count == keyOffsets.length || rawValues[count] == null) ? 
2845                  in.createValueBytes() : 
2846                  rawValues[count];
2847                int recordLength = in.nextRaw(rawKeys, rawValue);
2848                if (recordLength == -1) {
2849                  in.close();
2850                  if (deleteInput) {
2851                    fs.delete(inFiles[currentFile], true);
2852                  }
2853                  currentFile += 1;
2854                  atEof = currentFile >= inFiles.length;
2855                  if (!atEof) {
2856                    in = new Reader(fs, inFiles[currentFile], conf);
2857                  } else {
2858                    in = null;
2859                  }
2860                  continue;
2861                }
2862    
2863                int keyLength = rawKeys.getLength() - keyOffset;
2864    
2865                if (count == keyOffsets.length)
2866                  grow();
2867    
2868                keyOffsets[count] = keyOffset;                // update pointers
2869                pointers[count] = count;
2870                keyLengths[count] = keyLength;
2871                rawValues[count] = rawValue;
2872    
2873                bytesProcessed += recordLength; 
2874                count++;
2875              }
2876    
2877              // buffer is full -- sort & flush it
2878              if(LOG.isDebugEnabled()) {
2879                LOG.debug("flushing segment " + segments);
2880              }
2881              rawBuffer = rawKeys.getData();
2882              sort(count);
2883              // indicate we're making progress
2884              if (progressable != null) {
2885                progressable.progress();
2886              }
2887              flush(count, bytesProcessed, compressionType, codec, 
2888                    segments==0 && atEof);
2889              segments++;
2890            }
2891            return segments;
2892          }
2893    
2894          public void close() throws IOException {
2895            if (in != null) {
2896              in.close();
2897            }
2898            if (out != null) {
2899              out.close();
2900            }
2901            if (indexOut != null) {
2902              indexOut.close();
2903            }
2904          }
2905    
2906          private void grow() {
2907            int newLength = keyOffsets.length * 3 / 2;
2908            keyOffsets = grow(keyOffsets, newLength);
2909            pointers = grow(pointers, newLength);
2910            pointersCopy = new int[newLength];
2911            keyLengths = grow(keyLengths, newLength);
2912            rawValues = grow(rawValues, newLength);
2913          }
2914    
2915          private int[] grow(int[] old, int newLength) {
2916            int[] result = new int[newLength];
2917            System.arraycopy(old, 0, result, 0, old.length);
2918            return result;
2919          }
2920          
2921          private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2922            ValueBytes[] result = new ValueBytes[newLength];
2923            System.arraycopy(old, 0, result, 0, old.length);
2924            for (int i=old.length; i < newLength; ++i) {
2925              result[i] = null;
2926            }
2927            return result;
2928          }
2929    
2930          private void flush(int count, int bytesProcessed, 
2931                             CompressionType compressionType, 
2932                             CompressionCodec codec, 
2933                             boolean done) throws IOException {
2934            if (out == null) {
2935              outName = done ? outFile : outFile.suffix(".0");
2936              out = fs.create(outName);
2937              if (!done) {
2938                indexOut = fs.create(outName.suffix(".index"));
2939              }
2940            }
2941    
2942            long segmentStart = out.getPos();
2943            Writer writer = createWriter(conf, Writer.stream(out), 
2944                Writer.keyClass(keyClass), Writer.valueClass(valClass),
2945                Writer.compression(compressionType, codec),
2946                Writer.metadata(done ? metadata : new Metadata()));
2947            
2948            if (!done) {
2949              writer.sync = null;                     // disable sync on temp files
2950            }
2951    
2952            for (int i = 0; i < count; i++) {         // write in sorted order
2953              int p = pointers[i];
2954              writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2955            }
2956            writer.close();
2957            
2958            if (!done) {
2959              // Save the segment length
2960              WritableUtils.writeVLong(indexOut, segmentStart);
2961              WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2962              indexOut.flush();
2963            }
2964          }
2965    
2966          private void sort(int count) {
2967            System.arraycopy(pointers, 0, pointersCopy, 0, count);
2968            mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2969          }
2970          class SeqFileComparator implements Comparator<IntWritable> {
2971            @Override
2972            public int compare(IntWritable I, IntWritable J) {
2973              return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2974                                        keyLengths[I.get()], rawBuffer, 
2975                                        keyOffsets[J.get()], keyLengths[J.get()]);
2976            }
2977          }
2978          
2979          /** set the progressable object in order to report progress */
2980          public void setProgressable(Progressable progressable)
2981          {
2982            this.progressable = progressable;
2983          }
2984          
2985        } // SequenceFile.Sorter.SortPass
2986    
2987        /** The interface to iterate over raw keys/values of SequenceFiles. */
2988        public static interface RawKeyValueIterator {
2989          /** Gets the current raw key
2990           * @return DataOutputBuffer
2991           * @throws IOException
2992           */
2993          DataOutputBuffer getKey() throws IOException; 
2994          /** Gets the current raw value
2995           * @return ValueBytes 
2996           * @throws IOException
2997           */
2998          ValueBytes getValue() throws IOException; 
2999          /** Sets up the current key and value (for getKey and getValue)
3000           * @return true if there exists a key/value, false otherwise 
3001           * @throws IOException
3002           */
3003          boolean next() throws IOException;
3004          /** closes the iterator so that the underlying streams can be closed
3005           * @throws IOException
3006           */
3007          void close() throws IOException;
3008          /** Gets the Progress object; this has a float (0.0 - 1.0) 
3009           * indicating the bytes processed by the iterator so far
3010           */
3011          Progress getProgress();
3012        }    
3013        
3014        /**
3015         * Merges the list of segments of type <code>SegmentDescriptor</code>
3016         * @param segments the list of SegmentDescriptors
3017         * @param tmpDir the directory to write temporary files into
3018         * @return RawKeyValueIterator
3019         * @throws IOException
3020         */
3021        public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3022                                         Path tmpDir) 
3023          throws IOException {
3024          // pass in object to report progress, if present
3025          MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3026          return mQueue.merge();
3027        }
3028    
3029        /**
3030         * Merges the contents of files passed in Path[] using a max factor value
3031         * that is already set
3032         * @param inNames the array of path names
3033         * @param deleteInputs true if the input files should be deleted when 
3034         * unnecessary
3035         * @param tmpDir the directory to write temporary files into
3036         * @return RawKeyValueIteratorMergeQueue
3037         * @throws IOException
3038         */
3039        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3040                                         Path tmpDir) 
3041          throws IOException {
3042          return merge(inNames, deleteInputs, 
3043                       (inNames.length < factor) ? inNames.length : factor,
3044                       tmpDir);
3045        }
3046    
3047        /**
3048         * Merges the contents of files passed in Path[]
3049         * @param inNames the array of path names
3050         * @param deleteInputs true if the input files should be deleted when 
3051         * unnecessary
3052         * @param factor the factor that will be used as the maximum merge fan-in
3053         * @param tmpDir the directory to write temporary files into
3054         * @return RawKeyValueIteratorMergeQueue
3055         * @throws IOException
3056         */
3057        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3058                                         int factor, Path tmpDir) 
3059          throws IOException {
3060          //get the segments from inNames
3061          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3062          for (int i = 0; i < inNames.length; i++) {
3063            SegmentDescriptor s = new SegmentDescriptor(0,
3064                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3065            s.preserveInput(!deleteInputs);
3066            s.doSync();
3067            a.add(s);
3068          }
3069          this.factor = factor;
3070          MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3071          return mQueue.merge();
3072        }
3073    
3074        /**
3075         * Merges the contents of files passed in Path[]
3076         * @param inNames the array of path names
3077         * @param tempDir the directory for creating temp files during merge
3078         * @param deleteInputs true if the input files should be deleted when 
3079         * unnecessary
3080         * @return RawKeyValueIteratorMergeQueue
3081         * @throws IOException
3082         */
3083        public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3084                                         boolean deleteInputs) 
3085          throws IOException {
3086          //outFile will basically be used as prefix for temp files for the
3087          //intermediate merge outputs           
3088          this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3089          //get the segments from inNames
3090          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3091          for (int i = 0; i < inNames.length; i++) {
3092            SegmentDescriptor s = new SegmentDescriptor(0,
3093                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3094            s.preserveInput(!deleteInputs);
3095            s.doSync();
3096            a.add(s);
3097          }
3098          factor = (inNames.length < factor) ? inNames.length : factor;
3099          // pass in object to report progress, if present
3100          MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3101          return mQueue.merge();
3102        }
3103    
3104        /**
3105         * Clones the attributes (like compression of the input file and creates a 
3106         * corresponding Writer
3107         * @param inputFile the path of the input file whose attributes should be 
3108         * cloned
3109         * @param outputFile the path of the output file 
3110         * @param prog the Progressable to report status during the file write
3111         * @return Writer
3112         * @throws IOException
3113         */
3114        public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3115                                          Progressable prog) throws IOException {
3116          Reader reader = new Reader(conf,
3117                                     Reader.file(inputFile),
3118                                     new Reader.OnlyHeaderOption());
3119          CompressionType compress = reader.getCompressionType();
3120          CompressionCodec codec = reader.getCompressionCodec();
3121          reader.close();
3122    
3123          Writer writer = createWriter(conf, 
3124                                       Writer.file(outputFile), 
3125                                       Writer.keyClass(keyClass), 
3126                                       Writer.valueClass(valClass), 
3127                                       Writer.compression(compress, codec), 
3128                                       Writer.progressable(prog));
3129          return writer;
3130        }
3131    
3132        /**
3133         * Writes records from RawKeyValueIterator into a file represented by the 
3134         * passed writer
3135         * @param records the RawKeyValueIterator
3136         * @param writer the Writer created earlier 
3137         * @throws IOException
3138         */
3139        public void writeFile(RawKeyValueIterator records, Writer writer) 
3140          throws IOException {
3141          while(records.next()) {
3142            writer.appendRaw(records.getKey().getData(), 0, 
3143                             records.getKey().getLength(), records.getValue());
3144          }
3145          writer.sync();
3146        }
3147            
3148        /** Merge the provided files.
3149         * @param inFiles the array of input path names
3150         * @param outFile the final output file
3151         * @throws IOException
3152         */
3153        public void merge(Path[] inFiles, Path outFile) throws IOException {
3154          if (fs.exists(outFile)) {
3155            throw new IOException("already exists: " + outFile);
3156          }
3157          RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3158          Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3159          
3160          writeFile(r, writer);
3161    
3162          writer.close();
3163        }
3164    
3165        /** sort calls this to generate the final merged output */
3166        private int mergePass(Path tmpDir) throws IOException {
3167          if(LOG.isDebugEnabled()) {
3168            LOG.debug("running merge pass");
3169          }
3170          Writer writer = cloneFileAttributes(
3171                                              outFile.suffix(".0"), outFile, null);
3172          RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3173                                        outFile.suffix(".0.index"), tmpDir);
3174          writeFile(r, writer);
3175    
3176          writer.close();
3177          return 0;
3178        }
3179    
3180        /** Used by mergePass to merge the output of the sort
3181         * @param inName the name of the input file containing sorted segments
3182         * @param indexIn the offsets of the sorted segments
3183         * @param tmpDir the relative directory to store intermediate results in
3184         * @return RawKeyValueIterator
3185         * @throws IOException
3186         */
3187        private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3188          throws IOException {
3189          //get the segments from indexIn
3190          //we create a SegmentContainer so that we can track segments belonging to
3191          //inName and delete inName as soon as we see that we have looked at all
3192          //the contained segments during the merge process & hence don't need 
3193          //them anymore
3194          SegmentContainer container = new SegmentContainer(inName, indexIn);
3195          MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3196          return mQueue.merge();
3197        }
3198        
3199        /** This class implements the core of the merge logic */
3200        private class MergeQueue extends PriorityQueue 
3201          implements RawKeyValueIterator {
3202          private boolean compress;
3203          private boolean blockCompress;
3204          private DataOutputBuffer rawKey = new DataOutputBuffer();
3205          private ValueBytes rawValue;
3206          private long totalBytesProcessed;
3207          private float progPerByte;
3208          private Progress mergeProgress = new Progress();
3209          private Path tmpDir;
3210          private Progressable progress = null; //handle to the progress reporting object
3211          private SegmentDescriptor minSegment;
3212          
3213          //a TreeMap used to store the segments sorted by size (segment offset and
3214          //segment path name is used to break ties between segments of same sizes)
3215          private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3216            new TreeMap<SegmentDescriptor, Void>();
3217                
3218          @SuppressWarnings("unchecked")
3219          public void put(SegmentDescriptor stream) throws IOException {
3220            if (size() == 0) {
3221              compress = stream.in.isCompressed();
3222              blockCompress = stream.in.isBlockCompressed();
3223            } else if (compress != stream.in.isCompressed() || 
3224                       blockCompress != stream.in.isBlockCompressed()) {
3225              throw new IOException("All merged files must be compressed or not.");
3226            } 
3227            super.put(stream);
3228          }
3229          
3230          /**
3231           * A queue of file segments to merge
3232           * @param segments the file segments to merge
3233           * @param tmpDir a relative local directory to save intermediate files in
3234           * @param progress the reference to the Progressable object
3235           */
3236          public MergeQueue(List <SegmentDescriptor> segments,
3237              Path tmpDir, Progressable progress) {
3238            int size = segments.size();
3239            for (int i = 0; i < size; i++) {
3240              sortedSegmentSizes.put(segments.get(i), null);
3241            }
3242            this.tmpDir = tmpDir;
3243            this.progress = progress;
3244          }
3245          @Override
3246          protected boolean lessThan(Object a, Object b) {
3247            // indicate we're making progress
3248            if (progress != null) {
3249              progress.progress();
3250            }
3251            SegmentDescriptor msa = (SegmentDescriptor)a;
3252            SegmentDescriptor msb = (SegmentDescriptor)b;
3253            return comparator.compare(msa.getKey().getData(), 0, 
3254                                      msa.getKey().getLength(), msb.getKey().getData(), 0, 
3255                                      msb.getKey().getLength()) < 0;
3256          }
3257          @Override
3258          public void close() throws IOException {
3259            SegmentDescriptor ms;                           // close inputs
3260            while ((ms = (SegmentDescriptor)pop()) != null) {
3261              ms.cleanup();
3262            }
3263            minSegment = null;
3264          }
3265          @Override
3266          public DataOutputBuffer getKey() throws IOException {
3267            return rawKey;
3268          }
3269          @Override
3270          public ValueBytes getValue() throws IOException {
3271            return rawValue;
3272          }
3273          @Override
3274          public boolean next() throws IOException {
3275            if (size() == 0)
3276              return false;
3277            if (minSegment != null) {
3278              //minSegment is non-null for all invocations of next except the first
3279              //one. For the first invocation, the priority queue is ready for use
3280              //but for the subsequent invocations, first adjust the queue 
3281              adjustPriorityQueue(minSegment);
3282              if (size() == 0) {
3283                minSegment = null;
3284                return false;
3285              }
3286            }
3287            minSegment = (SegmentDescriptor)top();
3288            long startPos = minSegment.in.getPosition(); // Current position in stream
3289            //save the raw key reference
3290            rawKey = minSegment.getKey();
3291            //load the raw value. Re-use the existing rawValue buffer
3292            if (rawValue == null) {
3293              rawValue = minSegment.in.createValueBytes();
3294            }
3295            minSegment.nextRawValue(rawValue);
3296            long endPos = minSegment.in.getPosition(); // End position after reading value
3297            updateProgress(endPos - startPos);
3298            return true;
3299          }
3300          
3301          @Override
3302          public Progress getProgress() {
3303            return mergeProgress; 
3304          }
3305    
3306          private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3307            long startPos = ms.in.getPosition(); // Current position in stream
3308            boolean hasNext = ms.nextRawKey();
3309            long endPos = ms.in.getPosition(); // End position after reading key
3310            updateProgress(endPos - startPos);
3311            if (hasNext) {
3312              adjustTop();
3313            } else {
3314              pop();
3315              ms.cleanup();
3316            }
3317          }
3318    
3319          private void updateProgress(long bytesProcessed) {
3320            totalBytesProcessed += bytesProcessed;
3321            if (progPerByte > 0) {
3322              mergeProgress.set(totalBytesProcessed * progPerByte);
3323            }
3324          }
3325          
3326          /** This is the single level merge that is called multiple times 
3327           * depending on the factor size and the number of segments
3328           * @return RawKeyValueIterator
3329           * @throws IOException
3330           */
3331          public RawKeyValueIterator merge() throws IOException {
3332            //create the MergeStreams from the sorted map created in the constructor
3333            //and dump the final output to a file
3334            int numSegments = sortedSegmentSizes.size();
3335            int origFactor = factor;
3336            int passNo = 1;
3337            LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3338            do {
3339              //get the factor for this pass of merge
3340              factor = getPassFactor(passNo, numSegments);
3341              List<SegmentDescriptor> segmentsToMerge =
3342                new ArrayList<SegmentDescriptor>();
3343              int segmentsConsidered = 0;
3344              int numSegmentsToConsider = factor;
3345              while (true) {
3346                //extract the smallest 'factor' number of segment pointers from the 
3347                //TreeMap. Call cleanup on the empty segments (no key/value data)
3348                SegmentDescriptor[] mStream = 
3349                  getSegmentDescriptors(numSegmentsToConsider);
3350                for (int i = 0; i < mStream.length; i++) {
3351                  if (mStream[i].nextRawKey()) {
3352                    segmentsToMerge.add(mStream[i]);
3353                    segmentsConsidered++;
3354                    // Count the fact that we read some bytes in calling nextRawKey()
3355                    updateProgress(mStream[i].in.getPosition());
3356                  }
3357                  else {
3358                    mStream[i].cleanup();
3359                    numSegments--; //we ignore this segment for the merge
3360                  }
3361                }
3362                //if we have the desired number of segments
3363                //or looked at all available segments, we break
3364                if (segmentsConsidered == factor || 
3365                    sortedSegmentSizes.size() == 0) {
3366                  break;
3367                }
3368                  
3369                numSegmentsToConsider = factor - segmentsConsidered;
3370              }
3371              //feed the streams to the priority queue
3372              initialize(segmentsToMerge.size()); clear();
3373              for (int i = 0; i < segmentsToMerge.size(); i++) {
3374                put(segmentsToMerge.get(i));
3375              }
3376              //if we have lesser number of segments remaining, then just return the
3377              //iterator, else do another single level merge
3378              if (numSegments <= factor) {
3379                //calculate the length of the remaining segments. Required for 
3380                //calculating the merge progress
3381                long totalBytes = 0;
3382                for (int i = 0; i < segmentsToMerge.size(); i++) {
3383                  totalBytes += segmentsToMerge.get(i).segmentLength;
3384                }
3385                if (totalBytes != 0) //being paranoid
3386                  progPerByte = 1.0f / (float)totalBytes;
3387                //reset factor to what it originally was
3388                factor = origFactor;
3389                return this;
3390              } else {
3391                //we want to spread the creation of temp files on multiple disks if 
3392                //available under the space constraints
3393                long approxOutputSize = 0; 
3394                for (SegmentDescriptor s : segmentsToMerge) {
3395                  approxOutputSize += s.segmentLength + 
3396                                      ChecksumFileSystem.getApproxChkSumLength(
3397                                      s.segmentLength);
3398                }
3399                Path tmpFilename = 
3400                  new Path(tmpDir, "intermediate").suffix("." + passNo);
3401    
3402                Path outputFile =  lDirAlloc.getLocalPathForWrite(
3403                                                    tmpFilename.toString(),
3404                                                    approxOutputSize, conf);
3405                if(LOG.isDebugEnabled()) { 
3406                  LOG.debug("writing intermediate results to " + outputFile);
3407                }
3408                Writer writer = cloneFileAttributes(
3409                                                    fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3410                                                    fs.makeQualified(outputFile), null);
3411                writer.sync = null; //disable sync for temp files
3412                writeFile(this, writer);
3413                writer.close();
3414                
3415                //we finished one single level merge; now clean up the priority 
3416                //queue
3417                this.close();
3418                
3419                SegmentDescriptor tempSegment = 
3420                  new SegmentDescriptor(0,
3421                      fs.getFileStatus(outputFile).getLen(), outputFile);
3422                //put the segment back in the TreeMap
3423                sortedSegmentSizes.put(tempSegment, null);
3424                numSegments = sortedSegmentSizes.size();
3425                passNo++;
3426              }
3427              //we are worried about only the first pass merge factor. So reset the 
3428              //factor to what it originally was
3429              factor = origFactor;
3430            } while(true);
3431          }
3432      
3433          //Hadoop-591
3434          public int getPassFactor(int passNo, int numSegments) {
3435            if (passNo > 1 || numSegments <= factor || factor == 1) 
3436              return factor;
3437            int mod = (numSegments - 1) % (factor - 1);
3438            if (mod == 0)
3439              return factor;
3440            return mod + 1;
3441          }
3442          
3443          /** Return (& remove) the requested number of segment descriptors from the
3444           * sorted map.
3445           */
3446          public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3447            if (numDescriptors > sortedSegmentSizes.size())
3448              numDescriptors = sortedSegmentSizes.size();
3449            SegmentDescriptor[] SegmentDescriptors = 
3450              new SegmentDescriptor[numDescriptors];
3451            Iterator iter = sortedSegmentSizes.keySet().iterator();
3452            int i = 0;
3453            while (i < numDescriptors) {
3454              SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3455              iter.remove();
3456            }
3457            return SegmentDescriptors;
3458          }
3459        } // SequenceFile.Sorter.MergeQueue
3460    
3461        /** This class defines a merge segment. This class can be subclassed to 
3462         * provide a customized cleanup method implementation. In this 
3463         * implementation, cleanup closes the file handle and deletes the file 
3464         */
3465        public class SegmentDescriptor implements Comparable {
3466          
3467          long segmentOffset; //the start of the segment in the file
3468          long segmentLength; //the length of the segment
3469          Path segmentPathName; //the path name of the file containing the segment
3470          boolean ignoreSync = true; //set to true for temp files
3471          private Reader in = null; 
3472          private DataOutputBuffer rawKey = null; //this will hold the current key
3473          private boolean preserveInput = false; //delete input segment files?
3474          
3475          /** Constructs a segment
3476           * @param segmentOffset the offset of the segment in the file
3477           * @param segmentLength the length of the segment
3478           * @param segmentPathName the path name of the file containing the segment
3479           */
3480          public SegmentDescriptor (long segmentOffset, long segmentLength, 
3481                                    Path segmentPathName) {
3482            this.segmentOffset = segmentOffset;
3483            this.segmentLength = segmentLength;
3484            this.segmentPathName = segmentPathName;
3485          }
3486          
3487          /** Do the sync checks */
3488          public void doSync() {ignoreSync = false;}
3489          
3490          /** Whether to delete the files when no longer needed */
3491          public void preserveInput(boolean preserve) {
3492            preserveInput = preserve;
3493          }
3494    
3495          public boolean shouldPreserveInput() {
3496            return preserveInput;
3497          }
3498          
3499          @Override
3500          public int compareTo(Object o) {
3501            SegmentDescriptor that = (SegmentDescriptor)o;
3502            if (this.segmentLength != that.segmentLength) {
3503              return (this.segmentLength < that.segmentLength ? -1 : 1);
3504            }
3505            if (this.segmentOffset != that.segmentOffset) {
3506              return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3507            }
3508            return (this.segmentPathName.toString()).
3509              compareTo(that.segmentPathName.toString());
3510          }
3511    
3512          @Override
3513          public boolean equals(Object o) {
3514            if (!(o instanceof SegmentDescriptor)) {
3515              return false;
3516            }
3517            SegmentDescriptor that = (SegmentDescriptor)o;
3518            if (this.segmentLength == that.segmentLength &&
3519                this.segmentOffset == that.segmentOffset &&
3520                this.segmentPathName.toString().equals(
3521                  that.segmentPathName.toString())) {
3522              return true;
3523            }
3524            return false;
3525          }
3526    
3527          @Override
3528          public int hashCode() {
3529            return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3530          }
3531    
3532          /** Fills up the rawKey object with the key returned by the Reader
3533           * @return true if there is a key returned; false, otherwise
3534           * @throws IOException
3535           */
3536          public boolean nextRawKey() throws IOException {
3537            if (in == null) {
3538              int bufferSize = getBufferSize(conf); 
3539              Reader reader = new Reader(conf,
3540                                         Reader.file(segmentPathName), 
3541                                         Reader.bufferSize(bufferSize),
3542                                         Reader.start(segmentOffset), 
3543                                         Reader.length(segmentLength));
3544            
3545              //sometimes we ignore syncs especially for temp merge files
3546              if (ignoreSync) reader.ignoreSync();
3547    
3548              if (reader.getKeyClass() != keyClass)
3549                throw new IOException("wrong key class: " + reader.getKeyClass() +
3550                                      " is not " + keyClass);
3551              if (reader.getValueClass() != valClass)
3552                throw new IOException("wrong value class: "+reader.getValueClass()+
3553                                      " is not " + valClass);
3554              this.in = reader;
3555              rawKey = new DataOutputBuffer();
3556            }
3557            rawKey.reset();
3558            int keyLength = 
3559              in.nextRawKey(rawKey);
3560            return (keyLength >= 0);
3561          }
3562    
3563          /** Fills up the passed rawValue with the value corresponding to the key
3564           * read earlier
3565           * @param rawValue
3566           * @return the length of the value
3567           * @throws IOException
3568           */
3569          public int nextRawValue(ValueBytes rawValue) throws IOException {
3570            int valLength = in.nextRawValue(rawValue);
3571            return valLength;
3572          }
3573          
3574          /** Returns the stored rawKey */
3575          public DataOutputBuffer getKey() {
3576            return rawKey;
3577          }
3578          
3579          /** closes the underlying reader */
3580          private void close() throws IOException {
3581            this.in.close();
3582            this.in = null;
3583          }
3584    
3585          /** The default cleanup. Subclasses can override this with a custom 
3586           * cleanup 
3587           */
3588          public void cleanup() throws IOException {
3589            close();
3590            if (!preserveInput) {
3591              fs.delete(segmentPathName, true);
3592            }
3593          }
3594        } // SequenceFile.Sorter.SegmentDescriptor
3595        
3596        /** This class provisions multiple segments contained within a single
3597         *  file
3598         */
3599        private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3600    
3601          SegmentContainer parentContainer = null;
3602    
3603          /** Constructs a segment
3604           * @param segmentOffset the offset of the segment in the file
3605           * @param segmentLength the length of the segment
3606           * @param segmentPathName the path name of the file containing the segment
3607           * @param parent the parent SegmentContainer that holds the segment
3608           */
3609          public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3610                                           Path segmentPathName, SegmentContainer parent) {
3611            super(segmentOffset, segmentLength, segmentPathName);
3612            this.parentContainer = parent;
3613          }
3614          /** The default cleanup. Subclasses can override this with a custom 
3615           * cleanup 
3616           */
3617          @Override
3618          public void cleanup() throws IOException {
3619            super.close();
3620            if (super.shouldPreserveInput()) return;
3621            parentContainer.cleanup();
3622          }
3623          
3624          @Override
3625          public boolean equals(Object o) {
3626            if (!(o instanceof LinkedSegmentsDescriptor)) {
3627              return false;
3628            }
3629            return super.equals(o);
3630          }
3631        } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3632    
3633        /** The class that defines a container for segments to be merged. Primarily
3634         * required to delete temp files as soon as all the contained segments
3635         * have been looked at */
3636        private class SegmentContainer {
3637          private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3638          private int numSegmentsContained; //# of segments contained
3639          private Path inName; //input file from where segments are created
3640          
3641          //the list of segments read from the file
3642          private ArrayList <SegmentDescriptor> segments = 
3643            new ArrayList <SegmentDescriptor>();
3644          /** This constructor is there primarily to serve the sort routine that 
3645           * generates a single output file with an associated index file */
3646          public SegmentContainer(Path inName, Path indexIn) throws IOException {
3647            //get the segments from indexIn
3648            FSDataInputStream fsIndexIn = fs.open(indexIn);
3649            long end = fs.getFileStatus(indexIn).getLen();
3650            while (fsIndexIn.getPos() < end) {
3651              long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3652              long segmentLength = WritableUtils.readVLong(fsIndexIn);
3653              Path segmentName = inName;
3654              segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3655                                                        segmentLength, segmentName, this));
3656            }
3657            fsIndexIn.close();
3658            fs.delete(indexIn, true);
3659            numSegmentsContained = segments.size();
3660            this.inName = inName;
3661          }
3662    
3663          public List <SegmentDescriptor> getSegmentList() {
3664            return segments;
3665          }
3666          public void cleanup() throws IOException {
3667            numSegmentsCleanedUp++;
3668            if (numSegmentsCleanedUp == numSegmentsContained) {
3669              fs.delete(inName, true);
3670            }
3671          }
3672        } //SequenceFile.Sorter.SegmentContainer
3673    
3674      } // SequenceFile.Sorter
3675    
3676    } // SequenceFile