001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.*;
022    import java.util.*;
023    import java.rmi.server.UID;
024    import java.security.MessageDigest;
025    import org.apache.commons.logging.*;
026    import org.apache.hadoop.util.Options;
027    import org.apache.hadoop.fs.*;
028    import org.apache.hadoop.fs.Options.CreateOpts;
029    import org.apache.hadoop.io.compress.CodecPool;
030    import org.apache.hadoop.io.compress.CompressionCodec;
031    import org.apache.hadoop.io.compress.CompressionInputStream;
032    import org.apache.hadoop.io.compress.CompressionOutputStream;
033    import org.apache.hadoop.io.compress.Compressor;
034    import org.apache.hadoop.io.compress.Decompressor;
035    import org.apache.hadoop.io.compress.DefaultCodec;
036    import org.apache.hadoop.io.compress.GzipCodec;
037    import org.apache.hadoop.io.compress.zlib.ZlibFactory;
038    import org.apache.hadoop.io.serializer.Deserializer;
039    import org.apache.hadoop.io.serializer.Serializer;
040    import org.apache.hadoop.io.serializer.SerializationFactory;
041    import org.apache.hadoop.classification.InterfaceAudience;
042    import org.apache.hadoop.classification.InterfaceStability;
043    import org.apache.hadoop.conf.*;
044    import org.apache.hadoop.util.Progressable;
045    import org.apache.hadoop.util.Progress;
046    import org.apache.hadoop.util.ReflectionUtils;
047    import org.apache.hadoop.util.NativeCodeLoader;
048    import org.apache.hadoop.util.MergeSort;
049    import org.apache.hadoop.util.PriorityQueue;
050    import org.apache.hadoop.util.Time;
051    
052    /** 
053     * <code>SequenceFile</code>s are flat files consisting of binary key/value 
054     * pairs.
055     * 
056     * <p><code>SequenceFile</code> provides {@link SequenceFile.Writer},
057     * {@link SequenceFile.Reader} and {@link Sorter} classes for writing,
058     * reading and sorting respectively.</p>
059     * 
060     * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
061     * {@link CompressionType} used to compress key/value pairs:
062     * <ol>
063     *   <li>
064     *   <code>Writer</code> : Uncompressed records.
065     *   </li>
066     *   <li>
067     *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
068     *                                       values.
069     *   </li>
070     *   <li>
071     *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
072     *                                      values are collected in 'blocks' 
073     *                                      separately and compressed. The size of 
074     *                                      the 'block' is configurable.
075     * </ol>
076     * 
077     * <p>The actual compression algorithm used to compress key and/or values can be
078     * specified by using the appropriate {@link CompressionCodec}.</p>
079     * 
080     * <p>The recommended way is to use the static <tt>createWriter</tt> methods
081     * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
082     *
083     * <p>The {@link SequenceFile.Reader} acts as the bridge and can read any of the
084     * above <code>SequenceFile</code> formats.</p>
085     *
086     * <h4 id="Formats">SequenceFile Formats</h4>
087     * 
088     * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
089     * depending on the <code>CompressionType</code> specified. All of them share a
090     * <a href="#Header">common header</a> described below.
091     * 
092     * <h5 id="Header">SequenceFile Header</h5>
093     * <ul>
094     *   <li>
095     *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
096     *             version number (e.g. SEQ4 or SEQ6)
097     *   </li>
098     *   <li>
099     *   keyClassName -key class
100     *   </li>
101     *   <li>
102     *   valueClassName - value class
103     *   </li>
104     *   <li>
105     *   compression - A boolean which specifies if compression is turned on for 
106     *                 keys/values in this file.
107     *   </li>
108     *   <li>
109     *   blockCompression - A boolean which specifies if block-compression is 
110     *                      turned on for keys/values in this file.
111     *   </li>
112     *   <li>
113     *   compression codec - <code>CompressionCodec</code> class which is used for  
114     *                       compression of keys and/or values (if compression is 
115     *                       enabled).
116     *   </li>
117     *   <li>
118     *   metadata - {@link Metadata} for this file.
119     *   </li>
120     *   <li>
121     *   sync - A sync marker to denote end of the header.
122     *   </li>
123     * </ul>
124     * 
125     * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
126     * <ul>
127     * <li>
128     * <a href="#Header">Header</a>
129     * </li>
130     * <li>
131     * Record
132     *   <ul>
133     *     <li>Record length</li>
134     *     <li>Key length</li>
135     *     <li>Key</li>
136     *     <li>Value</li>
137     *   </ul>
138     * </li>
139     * <li>
140     * A sync-marker every few <code>100</code> bytes or so.
141     * </li>
142     * </ul>
143     *
144     * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
145     * <ul>
146     * <li>
147     * <a href="#Header">Header</a>
148     * </li>
149     * <li>
150     * Record
151     *   <ul>
152     *     <li>Record length</li>
153     *     <li>Key length</li>
154     *     <li>Key</li>
155     *     <li><i>Compressed</i> Value</li>
156     *   </ul>
157     * </li>
158     * <li>
159     * A sync-marker every few <code>100</code> bytes or so.
160     * </li>
161     * </ul>
162     * 
163     * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
164     * <ul>
165     * <li>
166     * <a href="#Header">Header</a>
167     * </li>
168     * <li>
169     * Record <i>Block</i>
170     *   <ul>
171     *     <li>Uncompressed number of records in the block</li>
172     *     <li>Compressed key-lengths block-size</li>
173     *     <li>Compressed key-lengths block</li>
174     *     <li>Compressed keys block-size</li>
175     *     <li>Compressed keys block</li>
176     *     <li>Compressed value-lengths block-size</li>
177     *     <li>Compressed value-lengths block</li>
178     *     <li>Compressed values block-size</li>
179     *     <li>Compressed values block</li>
180     *   </ul>
181     * </li>
182     * <li>
183     * A sync-marker every block.
184     * </li>
185     * </ul>
186     * 
187     * <p>The compressed blocks of key lengths and value lengths consist of the 
188     * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
189     * format.</p>
190     * 
191     * @see CompressionCodec
192     */
193    @InterfaceAudience.Public
194    @InterfaceStability.Stable
195    public class SequenceFile {
196      private static final Log LOG = LogFactory.getLog(SequenceFile.class);
197    
198      private SequenceFile() {}                         // no public ctor
199    
200      private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
201      private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
202      private static final byte VERSION_WITH_METADATA = (byte)6;
203      private static byte[] VERSION = new byte[] {
204        (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
205      };
206    
207      private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
208      private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
209      private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
210    
211      /** The number of bytes between sync points.*/
212      public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
213    
214      /** 
215       * The compression type used to compress key/value pairs in the 
216       * {@link SequenceFile}.
217       * 
218       * @see SequenceFile.Writer
219       */
220      public static enum CompressionType {
221        /** Do not compress records. */
222        NONE, 
223        /** Compress values only, each separately. */
224        RECORD,
225        /** Compress sequences of records together in blocks. */
226        BLOCK
227      }
228    
229      /**
230       * Get the compression type for the reduce outputs
231       * @param job the job config to look in
232       * @return the kind of compression to use
233       */
234      static public CompressionType getDefaultCompressionType(Configuration job) {
235        String name = job.get("io.seqfile.compression.type");
236        return name == null ? CompressionType.RECORD : 
237          CompressionType.valueOf(name);
238      }
239      
240      /**
241       * Set the default compression type for sequence files.
242       * @param job the configuration to modify
243       * @param val the new compression type (none, block, record)
244       */
245      static public void setDefaultCompressionType(Configuration job, 
246                                                   CompressionType val) {
247        job.set("io.seqfile.compression.type", val.toString());
248      }
249    
250      /**
251       * Create a new Writer with the given options.
252       * @param conf the configuration to use
253       * @param opts the options to create the file with
254       * @return a new Writer
255       * @throws IOException
256       */
257      public static Writer createWriter(Configuration conf, Writer.Option... opts
258                                        ) throws IOException {
259        Writer.CompressionOption compressionOption = 
260          Options.getOption(Writer.CompressionOption.class, opts);
261        CompressionType kind;
262        if (compressionOption != null) {
263          kind = compressionOption.getValue();
264        } else {
265          kind = getDefaultCompressionType(conf);
266          opts = Options.prependOptions(opts, Writer.compression(kind));
267        }
268        switch (kind) {
269          default:
270          case NONE:
271            return new Writer(conf, opts);
272          case RECORD:
273            return new RecordCompressWriter(conf, opts);
274          case BLOCK:
275            return new BlockCompressWriter(conf, opts);
276        }
277      }
278    
279      /**
280       * Construct the preferred type of SequenceFile Writer.
281       * @param fs The configured filesystem. 
282       * @param conf The configuration.
283       * @param name The name of the file. 
284       * @param keyClass The 'key' type.
285       * @param valClass The 'value' type.
286       * @return Returns the handle to the constructed SequenceFile Writer.
287       * @throws IOException
288       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
289       *     instead.
290       */
291      @Deprecated
292      public static Writer 
293        createWriter(FileSystem fs, Configuration conf, Path name, 
294                     Class keyClass, Class valClass) throws IOException {
295        return createWriter(conf, Writer.filesystem(fs),
296                            Writer.file(name), Writer.keyClass(keyClass),
297                            Writer.valueClass(valClass));
298      }
299      
300      /**
301       * Construct the preferred type of SequenceFile Writer.
302       * @param fs The configured filesystem. 
303       * @param conf The configuration.
304       * @param name The name of the file. 
305       * @param keyClass The 'key' type.
306       * @param valClass The 'value' type.
307       * @param compressionType The compression type.
308       * @return Returns the handle to the constructed SequenceFile Writer.
309       * @throws IOException
310       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
311       *     instead.
312       */
313      @Deprecated
314      public static Writer 
315        createWriter(FileSystem fs, Configuration conf, Path name, 
316                     Class keyClass, Class valClass, 
317                     CompressionType compressionType) throws IOException {
318        return createWriter(conf, Writer.filesystem(fs),
319                            Writer.file(name), Writer.keyClass(keyClass),
320                            Writer.valueClass(valClass), 
321                            Writer.compression(compressionType));
322      }
323      
324      /**
325       * Construct the preferred type of SequenceFile Writer.
326       * @param fs The configured filesystem. 
327       * @param conf The configuration.
328       * @param name The name of the file. 
329       * @param keyClass The 'key' type.
330       * @param valClass The 'value' type.
331       * @param compressionType The compression type.
332       * @param progress The Progressable object to track progress.
333       * @return Returns the handle to the constructed SequenceFile Writer.
334       * @throws IOException
335       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
336       *     instead.
337       */
338      @Deprecated
339      public static Writer
340        createWriter(FileSystem fs, Configuration conf, Path name, 
341                     Class keyClass, Class valClass, CompressionType compressionType,
342                     Progressable progress) throws IOException {
343        return createWriter(conf, Writer.file(name),
344                            Writer.filesystem(fs),
345                            Writer.keyClass(keyClass),
346                            Writer.valueClass(valClass), 
347                            Writer.compression(compressionType),
348                            Writer.progressable(progress));
349      }
350    
351      /**
352       * Construct the preferred type of SequenceFile Writer.
353       * @param fs The configured filesystem. 
354       * @param conf The configuration.
355       * @param name The name of the file. 
356       * @param keyClass The 'key' type.
357       * @param valClass The 'value' type.
358       * @param compressionType The compression type.
359       * @param codec The compression codec.
360       * @return Returns the handle to the constructed SequenceFile Writer.
361       * @throws IOException
362       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
363       *     instead.
364       */
365      @Deprecated
366      public static Writer 
367        createWriter(FileSystem fs, Configuration conf, Path name, 
368                     Class keyClass, Class valClass, CompressionType compressionType, 
369                     CompressionCodec codec) throws IOException {
370        return createWriter(conf, Writer.file(name),
371                            Writer.filesystem(fs),
372                            Writer.keyClass(keyClass),
373                            Writer.valueClass(valClass), 
374                            Writer.compression(compressionType, codec));
375      }
376      
377      /**
378       * Construct the preferred type of SequenceFile Writer.
379       * @param fs The configured filesystem. 
380       * @param conf The configuration.
381       * @param name The name of the file. 
382       * @param keyClass The 'key' type.
383       * @param valClass The 'value' type.
384       * @param compressionType The compression type.
385       * @param codec The compression codec.
386       * @param progress The Progressable object to track progress.
387       * @param metadata The metadata of the file.
388       * @return Returns the handle to the constructed SequenceFile Writer.
389       * @throws IOException
390       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
391       *     instead.
392       */
393      @Deprecated
394      public static Writer
395        createWriter(FileSystem fs, Configuration conf, Path name, 
396                     Class keyClass, Class valClass, 
397                     CompressionType compressionType, CompressionCodec codec,
398                     Progressable progress, Metadata metadata) throws IOException {
399        return createWriter(conf, Writer.file(name),
400                            Writer.filesystem(fs),
401                            Writer.keyClass(keyClass),
402                            Writer.valueClass(valClass),
403                            Writer.compression(compressionType, codec),
404                            Writer.progressable(progress),
405                            Writer.metadata(metadata));
406      }
407    
408      /**
409       * Construct the preferred type of SequenceFile Writer.
410       * @param fs The configured filesystem.
411       * @param conf The configuration.
412       * @param name The name of the file.
413       * @param keyClass The 'key' type.
414       * @param valClass The 'value' type.
415       * @param bufferSize buffer size for the underlaying outputstream.
416       * @param replication replication factor for the file.
417       * @param blockSize block size for the file.
418       * @param compressionType The compression type.
419       * @param codec The compression codec.
420       * @param progress The Progressable object to track progress.
421       * @param metadata The metadata of the file.
422       * @return Returns the handle to the constructed SequenceFile Writer.
423       * @throws IOException
424       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
425       *     instead.
426       */
427      @Deprecated
428      public static Writer
429        createWriter(FileSystem fs, Configuration conf, Path name,
430                     Class keyClass, Class valClass, int bufferSize,
431                     short replication, long blockSize,
432                     CompressionType compressionType, CompressionCodec codec,
433                     Progressable progress, Metadata metadata) throws IOException {
434        return createWriter(conf, Writer.file(name),
435                            Writer.filesystem(fs),
436                            Writer.keyClass(keyClass),
437                            Writer.valueClass(valClass), 
438                            Writer.bufferSize(bufferSize), 
439                            Writer.replication(replication),
440                            Writer.blockSize(blockSize),
441                            Writer.compression(compressionType, codec),
442                            Writer.progressable(progress),
443                            Writer.metadata(metadata));
444      }
445    
446      /**
447       * Construct the preferred type of SequenceFile Writer.
448       * @param fs The configured filesystem.
449       * @param conf The configuration.
450       * @param name The name of the file.
451       * @param keyClass The 'key' type.
452       * @param valClass The 'value' type.
453       * @param bufferSize buffer size for the underlaying outputstream.
454       * @param replication replication factor for the file.
455       * @param blockSize block size for the file.
456       * @param createParent create parent directory if non-existent
457       * @param compressionType The compression type.
458       * @param codec The compression codec.
459       * @param metadata The metadata of the file.
460       * @return Returns the handle to the constructed SequenceFile Writer.
461       * @throws IOException
462       */
463      @Deprecated
464      public static Writer
465      createWriter(FileSystem fs, Configuration conf, Path name,
466                   Class keyClass, Class valClass, int bufferSize,
467                   short replication, long blockSize, boolean createParent,
468                   CompressionType compressionType, CompressionCodec codec,
469                   Metadata metadata) throws IOException {
470        return createWriter(FileContext.getFileContext(fs.getUri(), conf),
471            conf, name, keyClass, valClass, compressionType, codec,
472            metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE),
473            CreateOpts.bufferSize(bufferSize),
474            createParent ? CreateOpts.createParent()
475                         : CreateOpts.donotCreateParent(),
476            CreateOpts.repFac(replication),
477            CreateOpts.blockSize(blockSize)
478          );
479      }
480    
481      /**
482       * Construct the preferred type of SequenceFile Writer.
483       * @param fc The context for the specified file.
484       * @param conf The configuration.
485       * @param name The name of the file.
486       * @param keyClass The 'key' type.
487       * @param valClass The 'value' type.
488       * @param compressionType The compression type.
489       * @param codec The compression codec.
490       * @param metadata The metadata of the file.
491       * @param createFlag gives the semantics of create: overwrite, append etc.
492       * @param opts file creation options; see {@link CreateOpts}.
493       * @return Returns the handle to the constructed SequenceFile Writer.
494       * @throws IOException
495       */
496      public static Writer
497      createWriter(FileContext fc, Configuration conf, Path name,
498                   Class keyClass, Class valClass,
499                   CompressionType compressionType, CompressionCodec codec,
500                   Metadata metadata,
501                   final EnumSet<CreateFlag> createFlag, CreateOpts... opts)
502                   throws IOException {
503        return createWriter(conf, fc.create(name, createFlag, opts),
504              keyClass, valClass, compressionType, codec, metadata).ownStream();
505      }
506    
507      /**
508       * Construct the preferred type of SequenceFile Writer.
509       * @param fs The configured filesystem. 
510       * @param conf The configuration.
511       * @param name The name of the file. 
512       * @param keyClass The 'key' type.
513       * @param valClass The 'value' type.
514       * @param compressionType The compression type.
515       * @param codec The compression codec.
516       * @param progress The Progressable object to track progress.
517       * @return Returns the handle to the constructed SequenceFile Writer.
518       * @throws IOException
519       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
520       *     instead.
521       */
522      @Deprecated
523      public static Writer
524        createWriter(FileSystem fs, Configuration conf, Path name, 
525                     Class keyClass, Class valClass, 
526                     CompressionType compressionType, CompressionCodec codec,
527                     Progressable progress) throws IOException {
528        return createWriter(conf, Writer.file(name),
529                            Writer.filesystem(fs),
530                            Writer.keyClass(keyClass),
531                            Writer.valueClass(valClass),
532                            Writer.compression(compressionType, codec),
533                            Writer.progressable(progress));
534      }
535    
536      /**
537       * Construct the preferred type of 'raw' SequenceFile Writer.
538       * @param conf The configuration.
539       * @param out The stream on top which the writer is to be constructed.
540       * @param keyClass The 'key' type.
541       * @param valClass The 'value' type.
542       * @param compressionType The compression type.
543       * @param codec The compression codec.
544       * @param metadata The metadata of the file.
545       * @return Returns the handle to the constructed SequenceFile Writer.
546       * @throws IOException
547       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
548       *     instead.
549       */
550      @Deprecated
551      public static Writer
552        createWriter(Configuration conf, FSDataOutputStream out, 
553                     Class keyClass, Class valClass,
554                     CompressionType compressionType,
555                     CompressionCodec codec, Metadata metadata) throws IOException {
556        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
557                            Writer.valueClass(valClass), 
558                            Writer.compression(compressionType, codec),
559                            Writer.metadata(metadata));
560      }
561      
562      /**
563       * Construct the preferred type of 'raw' SequenceFile Writer.
564       * @param conf The configuration.
565       * @param out The stream on top which the writer is to be constructed.
566       * @param keyClass The 'key' type.
567       * @param valClass The 'value' type.
568       * @param compressionType The compression type.
569       * @param codec The compression codec.
570       * @return Returns the handle to the constructed SequenceFile Writer.
571       * @throws IOException
572       * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)}
573       *     instead.
574       */
575      @Deprecated
576      public static Writer
577        createWriter(Configuration conf, FSDataOutputStream out, 
578                     Class keyClass, Class valClass, CompressionType compressionType,
579                     CompressionCodec codec) throws IOException {
580        return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass),
581                            Writer.valueClass(valClass),
582                            Writer.compression(compressionType, codec));
583      }
584      
585    
586      /** The interface to 'raw' values of SequenceFiles. */
587      public static interface ValueBytes {
588    
589        /** Writes the uncompressed bytes to the outStream.
590         * @param outStream : Stream to write uncompressed bytes into.
591         * @throws IOException
592         */
593        public void writeUncompressedBytes(DataOutputStream outStream)
594          throws IOException;
595    
596        /** Write compressed bytes to outStream. 
597         * Note: that it will NOT compress the bytes if they are not compressed.
598         * @param outStream : Stream to write compressed bytes into.
599         */
600        public void writeCompressedBytes(DataOutputStream outStream) 
601          throws IllegalArgumentException, IOException;
602    
603        /**
604         * Size of stored data.
605         */
606        public int getSize();
607      }
608      
609      private static class UncompressedBytes implements ValueBytes {
610        private int dataSize;
611        private byte[] data;
612        
613        private UncompressedBytes() {
614          data = null;
615          dataSize = 0;
616        }
617        
618        private void reset(DataInputStream in, int length) throws IOException {
619          if (data == null) {
620            data = new byte[length];
621          } else if (length > data.length) {
622            data = new byte[Math.max(length, data.length * 2)];
623          }
624          dataSize = -1;
625          in.readFully(data, 0, length);
626          dataSize = length;
627        }
628        
629        @Override
630        public int getSize() {
631          return dataSize;
632        }
633        
634        @Override
635        public void writeUncompressedBytes(DataOutputStream outStream)
636          throws IOException {
637          outStream.write(data, 0, dataSize);
638        }
639    
640        @Override
641        public void writeCompressedBytes(DataOutputStream outStream) 
642          throws IllegalArgumentException, IOException {
643          throw 
644            new IllegalArgumentException("UncompressedBytes cannot be compressed!");
645        }
646    
647      } // UncompressedBytes
648      
649      private static class CompressedBytes implements ValueBytes {
650        private int dataSize;
651        private byte[] data;
652        DataInputBuffer rawData = null;
653        CompressionCodec codec = null;
654        CompressionInputStream decompressedStream = null;
655    
656        private CompressedBytes(CompressionCodec codec) {
657          data = null;
658          dataSize = 0;
659          this.codec = codec;
660        }
661    
662        private void reset(DataInputStream in, int length) throws IOException {
663          if (data == null) {
664            data = new byte[length];
665          } else if (length > data.length) {
666            data = new byte[Math.max(length, data.length * 2)];
667          } 
668          dataSize = -1;
669          in.readFully(data, 0, length);
670          dataSize = length;
671        }
672        
673        @Override
674        public int getSize() {
675          return dataSize;
676        }
677        
678        @Override
679        public void writeUncompressedBytes(DataOutputStream outStream)
680          throws IOException {
681          if (decompressedStream == null) {
682            rawData = new DataInputBuffer();
683            decompressedStream = codec.createInputStream(rawData);
684          } else {
685            decompressedStream.resetState();
686          }
687          rawData.reset(data, 0, dataSize);
688    
689          byte[] buffer = new byte[8192];
690          int bytesRead = 0;
691          while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
692            outStream.write(buffer, 0, bytesRead);
693          }
694        }
695    
696        @Override
697        public void writeCompressedBytes(DataOutputStream outStream) 
698          throws IllegalArgumentException, IOException {
699          outStream.write(data, 0, dataSize);
700        }
701    
702      } // CompressedBytes
703      
704      /**
705       * The class encapsulating with the metadata of a file.
706       * The metadata of a file is a list of attribute name/value
707       * pairs of Text type.
708       *
709       */
710      public static class Metadata implements Writable {
711    
712        private TreeMap<Text, Text> theMetadata;
713        
714        public Metadata() {
715          this(new TreeMap<Text, Text>());
716        }
717        
718        public Metadata(TreeMap<Text, Text> arg) {
719          if (arg == null) {
720            this.theMetadata = new TreeMap<Text, Text>();
721          } else {
722            this.theMetadata = arg;
723          }
724        }
725        
726        public Text get(Text name) {
727          return this.theMetadata.get(name);
728        }
729        
730        public void set(Text name, Text value) {
731          this.theMetadata.put(name, value);
732        }
733        
734        public TreeMap<Text, Text> getMetadata() {
735          return new TreeMap<Text, Text>(this.theMetadata);
736        }
737        
738        @Override
739        public void write(DataOutput out) throws IOException {
740          out.writeInt(this.theMetadata.size());
741          Iterator<Map.Entry<Text, Text>> iter =
742            this.theMetadata.entrySet().iterator();
743          while (iter.hasNext()) {
744            Map.Entry<Text, Text> en = iter.next();
745            en.getKey().write(out);
746            en.getValue().write(out);
747          }
748        }
749    
750        @Override
751        public void readFields(DataInput in) throws IOException {
752          int sz = in.readInt();
753          if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
754          this.theMetadata = new TreeMap<Text, Text>();
755          for (int i = 0; i < sz; i++) {
756            Text key = new Text();
757            Text val = new Text();
758            key.readFields(in);
759            val.readFields(in);
760            this.theMetadata.put(key, val);
761          }    
762        }
763    
764        @Override
765        public boolean equals(Object other) {
766          if (other == null) {
767            return false;
768          }
769          if (other.getClass() != this.getClass()) {
770            return false;
771          } else {
772            return equals((Metadata)other);
773          }
774        }
775        
776        public boolean equals(Metadata other) {
777          if (other == null) return false;
778          if (this.theMetadata.size() != other.theMetadata.size()) {
779            return false;
780          }
781          Iterator<Map.Entry<Text, Text>> iter1 =
782            this.theMetadata.entrySet().iterator();
783          Iterator<Map.Entry<Text, Text>> iter2 =
784            other.theMetadata.entrySet().iterator();
785          while (iter1.hasNext() && iter2.hasNext()) {
786            Map.Entry<Text, Text> en1 = iter1.next();
787            Map.Entry<Text, Text> en2 = iter2.next();
788            if (!en1.getKey().equals(en2.getKey())) {
789              return false;
790            }
791            if (!en1.getValue().equals(en2.getValue())) {
792              return false;
793            }
794          }
795          if (iter1.hasNext() || iter2.hasNext()) {
796            return false;
797          }
798          return true;
799        }
800    
801        @Override
802        public int hashCode() {
803          assert false : "hashCode not designed";
804          return 42; // any arbitrary constant will do 
805        }
806        
807        @Override
808        public String toString() {
809          StringBuilder sb = new StringBuilder();
810          sb.append("size: ").append(this.theMetadata.size()).append("\n");
811          Iterator<Map.Entry<Text, Text>> iter =
812            this.theMetadata.entrySet().iterator();
813          while (iter.hasNext()) {
814            Map.Entry<Text, Text> en = iter.next();
815            sb.append("\t").append(en.getKey().toString()).append("\t").append(en.getValue().toString());
816            sb.append("\n");
817          }
818          return sb.toString();
819        }
820      }
821      
822      /** Write key/value pairs to a sequence-format file. */
823      public static class Writer implements java.io.Closeable, Syncable {
824        private Configuration conf;
825        FSDataOutputStream out;
826        boolean ownOutputStream = true;
827        DataOutputBuffer buffer = new DataOutputBuffer();
828    
829        Class keyClass;
830        Class valClass;
831    
832        private final CompressionType compress;
833        CompressionCodec codec = null;
834        CompressionOutputStream deflateFilter = null;
835        DataOutputStream deflateOut = null;
836        Metadata metadata = null;
837        Compressor compressor = null;
838        
839        protected Serializer keySerializer;
840        protected Serializer uncompressedValSerializer;
841        protected Serializer compressedValSerializer;
842        
843        // Insert a globally unique 16-byte value every few entries, so that one
844        // can seek into the middle of a file and then synchronize with record
845        // starts and ends by scanning for this value.
846        long lastSyncPos;                     // position of last sync
847        byte[] sync;                          // 16 random bytes
848        {
849          try {                                       
850            MessageDigest digester = MessageDigest.getInstance("MD5");
851            long time = Time.now();
852            digester.update((new UID()+"@"+time).getBytes());
853            sync = digester.digest();
854          } catch (Exception e) {
855            throw new RuntimeException(e);
856          }
857        }
858    
859        public static interface Option {}
860        
861        static class FileOption extends Options.PathOption 
862                                        implements Option {
863          FileOption(Path path) {
864            super(path);
865          }
866        }
867    
868        /**
869         * @deprecated only used for backwards-compatibility in the createWriter methods
870         * that take FileSystem.
871         */
872        @Deprecated
873        private static class FileSystemOption implements Option {
874          private final FileSystem value;
875          protected FileSystemOption(FileSystem value) {
876            this.value = value;
877          }
878          public FileSystem getValue() {
879            return value;
880          }
881        }
882    
883        static class StreamOption extends Options.FSDataOutputStreamOption 
884                                  implements Option {
885          StreamOption(FSDataOutputStream stream) {
886            super(stream);
887          }
888        }
889    
890        static class BufferSizeOption extends Options.IntegerOption
891                                      implements Option {
892          BufferSizeOption(int value) {
893            super(value);
894          }
895        }
896        
897        static class BlockSizeOption extends Options.LongOption implements Option {
898          BlockSizeOption(long value) {
899            super(value);
900          }
901        }
902    
903        static class ReplicationOption extends Options.IntegerOption
904                                       implements Option {
905          ReplicationOption(int value) {
906            super(value);
907          }
908        }
909    
910        static class KeyClassOption extends Options.ClassOption implements Option {
911          KeyClassOption(Class<?> value) {
912            super(value);
913          }
914        }
915    
916        static class ValueClassOption extends Options.ClassOption
917                                              implements Option {
918          ValueClassOption(Class<?> value) {
919            super(value);
920          }
921        }
922    
923        static class MetadataOption implements Option {
924          private final Metadata value;
925          MetadataOption(Metadata value) {
926            this.value = value;
927          }
928          Metadata getValue() {
929            return value;
930          }
931        }
932    
933        static class ProgressableOption extends Options.ProgressableOption
934                                        implements Option {
935          ProgressableOption(Progressable value) {
936            super(value);
937          }
938        }
939    
940        private static class CompressionOption implements Option {
941          private final CompressionType value;
942          private final CompressionCodec codec;
943          CompressionOption(CompressionType value) {
944            this(value, null);
945          }
946          CompressionOption(CompressionType value, CompressionCodec codec) {
947            this.value = value;
948            this.codec = (CompressionType.NONE != value && null == codec)
949              ? new DefaultCodec()
950              : codec;
951          }
952          CompressionType getValue() {
953            return value;
954          }
955          CompressionCodec getCodec() {
956            return codec;
957          }
958        }
959        
960        public static Option file(Path value) {
961          return new FileOption(value);
962        }
963    
964        /**
965         * @deprecated only used for backwards-compatibility in the createWriter methods
966         * that take FileSystem.
967         */
968        @Deprecated
969        private static Option filesystem(FileSystem fs) {
970          return new SequenceFile.Writer.FileSystemOption(fs);
971        }
972        
973        public static Option bufferSize(int value) {
974          return new BufferSizeOption(value);
975        }
976        
977        public static Option stream(FSDataOutputStream value) {
978          return new StreamOption(value);
979        }
980        
981        public static Option replication(short value) {
982          return new ReplicationOption(value);
983        }
984        
985        public static Option blockSize(long value) {
986          return new BlockSizeOption(value);
987        }
988        
989        public static Option progressable(Progressable value) {
990          return new ProgressableOption(value);
991        }
992    
993        public static Option keyClass(Class<?> value) {
994          return new KeyClassOption(value);
995        }
996        
997        public static Option valueClass(Class<?> value) {
998          return new ValueClassOption(value);
999        }
1000        
1001        public static Option metadata(Metadata value) {
1002          return new MetadataOption(value);
1003        }
1004    
1005        public static Option compression(CompressionType value) {
1006          return new CompressionOption(value);
1007        }
1008    
1009        public static Option compression(CompressionType value,
1010            CompressionCodec codec) {
1011          return new CompressionOption(value, codec);
1012        }
1013        
1014        /**
1015         * Construct a uncompressed writer from a set of options.
1016         * @param conf the configuration to use
1017         * @param options the options used when creating the writer
1018         * @throws IOException if it fails
1019         */
1020        Writer(Configuration conf, 
1021               Option... opts) throws IOException {
1022          BlockSizeOption blockSizeOption = 
1023            Options.getOption(BlockSizeOption.class, opts);
1024          BufferSizeOption bufferSizeOption = 
1025            Options.getOption(BufferSizeOption.class, opts);
1026          ReplicationOption replicationOption = 
1027            Options.getOption(ReplicationOption.class, opts);
1028          ProgressableOption progressOption = 
1029            Options.getOption(ProgressableOption.class, opts);
1030          FileOption fileOption = Options.getOption(FileOption.class, opts);
1031          FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
1032          StreamOption streamOption = Options.getOption(StreamOption.class, opts);
1033          KeyClassOption keyClassOption = 
1034            Options.getOption(KeyClassOption.class, opts);
1035          ValueClassOption valueClassOption = 
1036            Options.getOption(ValueClassOption.class, opts);
1037          MetadataOption metadataOption = 
1038            Options.getOption(MetadataOption.class, opts);
1039          CompressionOption compressionTypeOption =
1040            Options.getOption(CompressionOption.class, opts);
1041          // check consistency of options
1042          if ((fileOption == null) == (streamOption == null)) {
1043            throw new IllegalArgumentException("file or stream must be specified");
1044          }
1045          if (fileOption == null && (blockSizeOption != null ||
1046                                     bufferSizeOption != null ||
1047                                     replicationOption != null ||
1048                                     progressOption != null)) {
1049            throw new IllegalArgumentException("file modifier options not " +
1050                                               "compatible with stream");
1051          }
1052    
1053          FSDataOutputStream out;
1054          boolean ownStream = fileOption != null;
1055          if (ownStream) {
1056            Path p = fileOption.getValue();
1057            FileSystem fs;
1058            if (fsOption != null) {
1059              fs = fsOption.getValue();
1060            } else {
1061              fs = p.getFileSystem(conf);
1062            }
1063            int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
1064              bufferSizeOption.getValue();
1065            short replication = replicationOption == null ? 
1066              fs.getDefaultReplication(p) :
1067              (short) replicationOption.getValue();
1068            long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) :
1069              blockSizeOption.getValue();
1070            Progressable progress = progressOption == null ? null :
1071              progressOption.getValue();
1072            out = fs.create(p, true, bufferSize, replication, blockSize, progress);
1073          } else {
1074            out = streamOption.getValue();
1075          }
1076          Class<?> keyClass = keyClassOption == null ?
1077              Object.class : keyClassOption.getValue();
1078          Class<?> valueClass = valueClassOption == null ?
1079              Object.class : valueClassOption.getValue();
1080          Metadata metadata = metadataOption == null ?
1081              new Metadata() : metadataOption.getValue();
1082          this.compress = compressionTypeOption.getValue();
1083          final CompressionCodec codec = compressionTypeOption.getCodec();
1084          if (codec != null &&
1085              (codec instanceof GzipCodec) &&
1086              !NativeCodeLoader.isNativeCodeLoaded() &&
1087              !ZlibFactory.isNativeZlibLoaded(conf)) {
1088            throw new IllegalArgumentException("SequenceFile doesn't work with " +
1089                                               "GzipCodec without native-hadoop " +
1090                                               "code!");
1091          }
1092          init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
1093        }
1094    
1095        /** Create the named file.
1096         * @deprecated Use 
1097         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1098         *   instead.
1099         */
1100        @Deprecated
1101        public Writer(FileSystem fs, Configuration conf, Path name, 
1102                      Class keyClass, Class valClass) throws IOException {
1103          this.compress = CompressionType.NONE;
1104          init(conf, fs.create(name), true, keyClass, valClass, null, 
1105               new Metadata());
1106        }
1107        
1108        /** Create the named file with write-progress reporter.
1109         * @deprecated Use 
1110         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1111         *   instead.
1112         */
1113        @Deprecated
1114        public Writer(FileSystem fs, Configuration conf, Path name, 
1115                      Class keyClass, Class valClass,
1116                      Progressable progress, Metadata metadata) throws IOException {
1117          this.compress = CompressionType.NONE;
1118          init(conf, fs.create(name, progress), true, keyClass, valClass,
1119               null, metadata);
1120        }
1121        
1122        /** Create the named file with write-progress reporter. 
1123         * @deprecated Use 
1124         *   {@link SequenceFile#createWriter(Configuration, Writer.Option...)} 
1125         *   instead.
1126         */
1127        @Deprecated
1128        public Writer(FileSystem fs, Configuration conf, Path name,
1129                      Class keyClass, Class valClass,
1130                      int bufferSize, short replication, long blockSize,
1131                      Progressable progress, Metadata metadata) throws IOException {
1132          this.compress = CompressionType.NONE;
1133          init(conf,
1134               fs.create(name, true, bufferSize, replication, blockSize, progress),
1135               true, keyClass, valClass, null, metadata);
1136        }
1137    
1138        boolean isCompressed() { return compress != CompressionType.NONE; }
1139        boolean isBlockCompressed() { return compress == CompressionType.BLOCK; }
1140        
1141        Writer ownStream() { this.ownOutputStream = true; return this;  }
1142    
1143        /** Write and flush the file header. */
1144        private void writeFileHeader() 
1145          throws IOException {
1146          out.write(VERSION);
1147          Text.writeString(out, keyClass.getName());
1148          Text.writeString(out, valClass.getName());
1149          
1150          out.writeBoolean(this.isCompressed());
1151          out.writeBoolean(this.isBlockCompressed());
1152          
1153          if (this.isCompressed()) {
1154            Text.writeString(out, (codec.getClass()).getName());
1155          }
1156          this.metadata.write(out);
1157          out.write(sync);                       // write the sync bytes
1158          out.flush();                           // flush header
1159        }
1160        
1161        /** Initialize. */
1162        @SuppressWarnings("unchecked")
1163        void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
1164                  Class keyClass, Class valClass,
1165                  CompressionCodec codec, Metadata metadata) 
1166          throws IOException {
1167          this.conf = conf;
1168          this.out = out;
1169          this.ownOutputStream = ownStream;
1170          this.keyClass = keyClass;
1171          this.valClass = valClass;
1172          this.codec = codec;
1173          this.metadata = metadata;
1174          SerializationFactory serializationFactory = new SerializationFactory(conf);
1175          this.keySerializer = serializationFactory.getSerializer(keyClass);
1176          if (this.keySerializer == null) {
1177            throw new IOException(
1178                "Could not find a serializer for the Key class: '"
1179                    + keyClass.getCanonicalName() + "'. "
1180                    + "Please ensure that the configuration '" +
1181                    CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1182                    + "properly configured, if you're using"
1183                    + "custom serialization.");
1184          }
1185          this.keySerializer.open(buffer);
1186          this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
1187          if (this.uncompressedValSerializer == null) {
1188            throw new IOException(
1189                "Could not find a serializer for the Value class: '"
1190                    + valClass.getCanonicalName() + "'. "
1191                    + "Please ensure that the configuration '" +
1192                    CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1193                    + "properly configured, if you're using"
1194                    + "custom serialization.");
1195          }
1196          this.uncompressedValSerializer.open(buffer);
1197          if (this.codec != null) {
1198            ReflectionUtils.setConf(this.codec, this.conf);
1199            this.compressor = CodecPool.getCompressor(this.codec);
1200            this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
1201            this.deflateOut = 
1202              new DataOutputStream(new BufferedOutputStream(deflateFilter));
1203            this.compressedValSerializer = serializationFactory.getSerializer(valClass);
1204            if (this.compressedValSerializer == null) {
1205              throw new IOException(
1206                  "Could not find a serializer for the Value class: '"
1207                      + valClass.getCanonicalName() + "'. "
1208                      + "Please ensure that the configuration '" +
1209                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1210                      + "properly configured, if you're using"
1211                      + "custom serialization.");
1212            }
1213            this.compressedValSerializer.open(deflateOut);
1214          }
1215          writeFileHeader();
1216        }
1217        
1218        /** Returns the class of keys in this file. */
1219        public Class getKeyClass() { return keyClass; }
1220    
1221        /** Returns the class of values in this file. */
1222        public Class getValueClass() { return valClass; }
1223    
1224        /** Returns the compression codec of data in this file. */
1225        public CompressionCodec getCompressionCodec() { return codec; }
1226        
1227        /** create a sync point */
1228        public void sync() throws IOException {
1229          if (sync != null && lastSyncPos != out.getPos()) {
1230            out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
1231            out.write(sync);                          // write sync
1232            lastSyncPos = out.getPos();               // update lastSyncPos
1233          }
1234        }
1235    
1236        /**
1237         * flush all currently written data to the file system
1238         * @deprecated Use {@link #hsync()} or {@link #hflush()} instead
1239         */
1240        @Deprecated
1241        public void syncFs() throws IOException {
1242          if (out != null) {
1243            out.sync();                               // flush contents to file system
1244          }
1245        }
1246    
1247        @Override
1248        public void hsync() throws IOException {
1249          if (out != null) {
1250            out.hsync();
1251          }
1252        }
1253    
1254        @Override
1255        public void hflush() throws IOException {
1256          if (out != null) {
1257            out.hflush();
1258          }
1259        }
1260        
1261        /** Returns the configuration of this file. */
1262        Configuration getConf() { return conf; }
1263        
1264        /** Close the file. */
1265        @Override
1266        public synchronized void close() throws IOException {
1267          keySerializer.close();
1268          uncompressedValSerializer.close();
1269          if (compressedValSerializer != null) {
1270            compressedValSerializer.close();
1271          }
1272    
1273          CodecPool.returnCompressor(compressor);
1274          compressor = null;
1275          
1276          if (out != null) {
1277            
1278            // Close the underlying stream iff we own it...
1279            if (ownOutputStream) {
1280              out.close();
1281            } else {
1282              out.flush();
1283            }
1284            out = null;
1285          }
1286        }
1287    
1288        synchronized void checkAndWriteSync() throws IOException {
1289          if (sync != null &&
1290              out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
1291            sync();
1292          }
1293        }
1294    
1295        /** Append a key/value pair. */
1296        public void append(Writable key, Writable val)
1297          throws IOException {
1298          append((Object) key, (Object) val);
1299        }
1300    
1301        /** Append a key/value pair. */
1302        @SuppressWarnings("unchecked")
1303        public synchronized void append(Object key, Object val)
1304          throws IOException {
1305          if (key.getClass() != keyClass)
1306            throw new IOException("wrong key class: "+key.getClass().getName()
1307                                  +" is not "+keyClass);
1308          if (val.getClass() != valClass)
1309            throw new IOException("wrong value class: "+val.getClass().getName()
1310                                  +" is not "+valClass);
1311    
1312          buffer.reset();
1313    
1314          // Append the 'key'
1315          keySerializer.serialize(key);
1316          int keyLength = buffer.getLength();
1317          if (keyLength < 0)
1318            throw new IOException("negative length keys not allowed: " + key);
1319    
1320          // Append the 'value'
1321          if (compress == CompressionType.RECORD) {
1322            deflateFilter.resetState();
1323            compressedValSerializer.serialize(val);
1324            deflateOut.flush();
1325            deflateFilter.finish();
1326          } else {
1327            uncompressedValSerializer.serialize(val);
1328          }
1329    
1330          // Write the record out
1331          checkAndWriteSync();                                // sync
1332          out.writeInt(buffer.getLength());                   // total record length
1333          out.writeInt(keyLength);                            // key portion length
1334          out.write(buffer.getData(), 0, buffer.getLength()); // data
1335        }
1336    
1337        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1338            int keyLength, ValueBytes val) throws IOException {
1339          if (keyLength < 0)
1340            throw new IOException("negative length keys not allowed: " + keyLength);
1341    
1342          int valLength = val.getSize();
1343    
1344          checkAndWriteSync();
1345          
1346          out.writeInt(keyLength+valLength);          // total record length
1347          out.writeInt(keyLength);                    // key portion length
1348          out.write(keyData, keyOffset, keyLength);   // key
1349          val.writeUncompressedBytes(out);            // value
1350        }
1351    
1352        /** Returns the current length of the output file.
1353         *
1354         * <p>This always returns a synchronized position.  In other words,
1355         * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
1356         * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
1357         * the key may be earlier in the file than key last written when this
1358         * method was called (e.g., with block-compression, it may be the first key
1359         * in the block that was being written when this method was called).
1360         */
1361        public synchronized long getLength() throws IOException {
1362          return out.getPos();
1363        }
1364    
1365      } // class Writer
1366    
1367      /** Write key/compressed-value pairs to a sequence-format file. */
1368      static class RecordCompressWriter extends Writer {
1369        
1370        RecordCompressWriter(Configuration conf, 
1371                             Option... options) throws IOException {
1372          super(conf, options);
1373        }
1374    
1375        /** Append a key/value pair. */
1376        @Override
1377        @SuppressWarnings("unchecked")
1378        public synchronized void append(Object key, Object val)
1379          throws IOException {
1380          if (key.getClass() != keyClass)
1381            throw new IOException("wrong key class: "+key.getClass().getName()
1382                                  +" is not "+keyClass);
1383          if (val.getClass() != valClass)
1384            throw new IOException("wrong value class: "+val.getClass().getName()
1385                                  +" is not "+valClass);
1386    
1387          buffer.reset();
1388    
1389          // Append the 'key'
1390          keySerializer.serialize(key);
1391          int keyLength = buffer.getLength();
1392          if (keyLength < 0)
1393            throw new IOException("negative length keys not allowed: " + key);
1394    
1395          // Compress 'value' and append it
1396          deflateFilter.resetState();
1397          compressedValSerializer.serialize(val);
1398          deflateOut.flush();
1399          deflateFilter.finish();
1400    
1401          // Write the record out
1402          checkAndWriteSync();                                // sync
1403          out.writeInt(buffer.getLength());                   // total record length
1404          out.writeInt(keyLength);                            // key portion length
1405          out.write(buffer.getData(), 0, buffer.getLength()); // data
1406        }
1407    
1408        /** Append a key/value pair. */
1409        @Override
1410        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1411            int keyLength, ValueBytes val) throws IOException {
1412    
1413          if (keyLength < 0)
1414            throw new IOException("negative length keys not allowed: " + keyLength);
1415    
1416          int valLength = val.getSize();
1417          
1418          checkAndWriteSync();                        // sync
1419          out.writeInt(keyLength+valLength);          // total record length
1420          out.writeInt(keyLength);                    // key portion length
1421          out.write(keyData, keyOffset, keyLength);   // 'key' data
1422          val.writeCompressedBytes(out);              // 'value' data
1423        }
1424        
1425      } // RecordCompressionWriter
1426    
1427      /** Write compressed key/value blocks to a sequence-format file. */
1428      static class BlockCompressWriter extends Writer {
1429        
1430        private int noBufferedRecords = 0;
1431        
1432        private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
1433        private DataOutputBuffer keyBuffer = new DataOutputBuffer();
1434    
1435        private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
1436        private DataOutputBuffer valBuffer = new DataOutputBuffer();
1437    
1438        private final int compressionBlockSize;
1439        
1440        BlockCompressWriter(Configuration conf,
1441                            Option... options) throws IOException {
1442          super(conf, options);
1443          compressionBlockSize = 
1444            conf.getInt("io.seqfile.compress.blocksize", 1000000);
1445          keySerializer.close();
1446          keySerializer.open(keyBuffer);
1447          uncompressedValSerializer.close();
1448          uncompressedValSerializer.open(valBuffer);
1449        }
1450    
1451        /** Workhorse to check and write out compressed data/lengths */
1452        private synchronized 
1453          void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
1454          throws IOException {
1455          deflateFilter.resetState();
1456          buffer.reset();
1457          deflateOut.write(uncompressedDataBuffer.getData(), 0, 
1458                           uncompressedDataBuffer.getLength());
1459          deflateOut.flush();
1460          deflateFilter.finish();
1461          
1462          WritableUtils.writeVInt(out, buffer.getLength());
1463          out.write(buffer.getData(), 0, buffer.getLength());
1464        }
1465        
1466        /** Compress and flush contents to dfs */
1467        @Override
1468        public synchronized void sync() throws IOException {
1469          if (noBufferedRecords > 0) {
1470            super.sync();
1471            
1472            // No. of records
1473            WritableUtils.writeVInt(out, noBufferedRecords);
1474            
1475            // Write 'keys' and lengths
1476            writeBuffer(keyLenBuffer);
1477            writeBuffer(keyBuffer);
1478            
1479            // Write 'values' and lengths
1480            writeBuffer(valLenBuffer);
1481            writeBuffer(valBuffer);
1482            
1483            // Flush the file-stream
1484            out.flush();
1485            
1486            // Reset internal states
1487            keyLenBuffer.reset();
1488            keyBuffer.reset();
1489            valLenBuffer.reset();
1490            valBuffer.reset();
1491            noBufferedRecords = 0;
1492          }
1493          
1494        }
1495        
1496        /** Close the file. */
1497        @Override
1498        public synchronized void close() throws IOException {
1499          if (out != null) {
1500            sync();
1501          }
1502          super.close();
1503        }
1504    
1505        /** Append a key/value pair. */
1506        @Override
1507        @SuppressWarnings("unchecked")
1508        public synchronized void append(Object key, Object val)
1509          throws IOException {
1510          if (key.getClass() != keyClass)
1511            throw new IOException("wrong key class: "+key+" is not "+keyClass);
1512          if (val.getClass() != valClass)
1513            throw new IOException("wrong value class: "+val+" is not "+valClass);
1514    
1515          // Save key/value into respective buffers 
1516          int oldKeyLength = keyBuffer.getLength();
1517          keySerializer.serialize(key);
1518          int keyLength = keyBuffer.getLength() - oldKeyLength;
1519          if (keyLength < 0)
1520            throw new IOException("negative length keys not allowed: " + key);
1521          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1522    
1523          int oldValLength = valBuffer.getLength();
1524          uncompressedValSerializer.serialize(val);
1525          int valLength = valBuffer.getLength() - oldValLength;
1526          WritableUtils.writeVInt(valLenBuffer, valLength);
1527          
1528          // Added another key/value pair
1529          ++noBufferedRecords;
1530          
1531          // Compress and flush?
1532          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
1533          if (currentBlockSize >= compressionBlockSize) {
1534            sync();
1535          }
1536        }
1537        
1538        /** Append a key/value pair. */
1539        @Override
1540        public synchronized void appendRaw(byte[] keyData, int keyOffset,
1541            int keyLength, ValueBytes val) throws IOException {
1542          
1543          if (keyLength < 0)
1544            throw new IOException("negative length keys not allowed");
1545    
1546          int valLength = val.getSize();
1547          
1548          // Save key/value data in relevant buffers
1549          WritableUtils.writeVInt(keyLenBuffer, keyLength);
1550          keyBuffer.write(keyData, keyOffset, keyLength);
1551          WritableUtils.writeVInt(valLenBuffer, valLength);
1552          val.writeUncompressedBytes(valBuffer);
1553    
1554          // Added another key/value pair
1555          ++noBufferedRecords;
1556    
1557          // Compress and flush?
1558          int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
1559          if (currentBlockSize >= compressionBlockSize) {
1560            sync();
1561          }
1562        }
1563      
1564      } // BlockCompressionWriter
1565    
1566      /** Get the configured buffer size */
1567      private static int getBufferSize(Configuration conf) {
1568        return conf.getInt("io.file.buffer.size", 4096);
1569      }
1570    
1571      /** Reads key/value pairs from a sequence-format file. */
1572      public static class Reader implements java.io.Closeable {
1573        private String filename;
1574        private FSDataInputStream in;
1575        private DataOutputBuffer outBuf = new DataOutputBuffer();
1576    
1577        private byte version;
1578    
1579        private String keyClassName;
1580        private String valClassName;
1581        private Class keyClass;
1582        private Class valClass;
1583    
1584        private CompressionCodec codec = null;
1585        private Metadata metadata = null;
1586        
1587        private byte[] sync = new byte[SYNC_HASH_SIZE];
1588        private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
1589        private boolean syncSeen;
1590    
1591        private long headerEnd;
1592        private long end;
1593        private int keyLength;
1594        private int recordLength;
1595    
1596        private boolean decompress;
1597        private boolean blockCompressed;
1598        
1599        private Configuration conf;
1600    
1601        private int noBufferedRecords = 0;
1602        private boolean lazyDecompress = true;
1603        private boolean valuesDecompressed = true;
1604        
1605        private int noBufferedKeys = 0;
1606        private int noBufferedValues = 0;
1607        
1608        private DataInputBuffer keyLenBuffer = null;
1609        private CompressionInputStream keyLenInFilter = null;
1610        private DataInputStream keyLenIn = null;
1611        private Decompressor keyLenDecompressor = null;
1612        private DataInputBuffer keyBuffer = null;
1613        private CompressionInputStream keyInFilter = null;
1614        private DataInputStream keyIn = null;
1615        private Decompressor keyDecompressor = null;
1616    
1617        private DataInputBuffer valLenBuffer = null;
1618        private CompressionInputStream valLenInFilter = null;
1619        private DataInputStream valLenIn = null;
1620        private Decompressor valLenDecompressor = null;
1621        private DataInputBuffer valBuffer = null;
1622        private CompressionInputStream valInFilter = null;
1623        private DataInputStream valIn = null;
1624        private Decompressor valDecompressor = null;
1625        
1626        private Deserializer keyDeserializer;
1627        private Deserializer valDeserializer;
1628    
1629        /**
1630         * A tag interface for all of the Reader options
1631         */
1632        public static interface Option {}
1633        
1634        /**
1635         * Create an option to specify the path name of the sequence file.
1636         * @param value the path to read
1637         * @return a new option
1638         */
1639        public static Option file(Path value) {
1640          return new FileOption(value);
1641        }
1642        
1643        /**
1644         * Create an option to specify the stream with the sequence file.
1645         * @param value the stream to read.
1646         * @return a new option
1647         */
1648        public static Option stream(FSDataInputStream value) {
1649          return new InputStreamOption(value);
1650        }
1651        
1652        /**
1653         * Create an option to specify the starting byte to read.
1654         * @param value the number of bytes to skip over
1655         * @return a new option
1656         */
1657        public static Option start(long value) {
1658          return new StartOption(value);
1659        }
1660        
1661        /**
1662         * Create an option to specify the number of bytes to read.
1663         * @param value the number of bytes to read
1664         * @return a new option
1665         */
1666        public static Option length(long value) {
1667          return new LengthOption(value);
1668        }
1669        
1670        /**
1671         * Create an option with the buffer size for reading the given pathname.
1672         * @param value the number of bytes to buffer
1673         * @return a new option
1674         */
1675        public static Option bufferSize(int value) {
1676          return new BufferSizeOption(value);
1677        }
1678    
1679        private static class FileOption extends Options.PathOption 
1680                                        implements Option {
1681          private FileOption(Path value) {
1682            super(value);
1683          }
1684        }
1685        
1686        private static class InputStreamOption
1687            extends Options.FSDataInputStreamOption 
1688            implements Option {
1689          private InputStreamOption(FSDataInputStream value) {
1690            super(value);
1691          }
1692        }
1693    
1694        private static class StartOption extends Options.LongOption
1695                                         implements Option {
1696          private StartOption(long value) {
1697            super(value);
1698          }
1699        }
1700    
1701        private static class LengthOption extends Options.LongOption
1702                                          implements Option {
1703          private LengthOption(long value) {
1704            super(value);
1705          }
1706        }
1707    
1708        private static class BufferSizeOption extends Options.IntegerOption
1709                                          implements Option {
1710          private BufferSizeOption(int value) {
1711            super(value);
1712          }
1713        }
1714    
1715        // only used directly
1716        private static class OnlyHeaderOption extends Options.BooleanOption 
1717                                              implements Option {
1718          private OnlyHeaderOption() {
1719            super(true);
1720          }
1721        }
1722    
1723        public Reader(Configuration conf, Option... opts) throws IOException {
1724          // Look up the options, these are null if not set
1725          FileOption fileOpt = Options.getOption(FileOption.class, opts);
1726          InputStreamOption streamOpt = 
1727            Options.getOption(InputStreamOption.class, opts);
1728          StartOption startOpt = Options.getOption(StartOption.class, opts);
1729          LengthOption lenOpt = Options.getOption(LengthOption.class, opts);
1730          BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts);
1731          OnlyHeaderOption headerOnly = 
1732            Options.getOption(OnlyHeaderOption.class, opts);
1733          // check for consistency
1734          if ((fileOpt == null) == (streamOpt == null)) {
1735            throw new 
1736              IllegalArgumentException("File or stream option must be specified");
1737          }
1738          if (fileOpt == null && bufOpt != null) {
1739            throw new IllegalArgumentException("buffer size can only be set when" +
1740                                               " a file is specified.");
1741          }
1742          // figure out the real values
1743          Path filename = null;
1744          FSDataInputStream file;
1745          final long len;
1746          if (fileOpt != null) {
1747            filename = fileOpt.getValue();
1748            FileSystem fs = filename.getFileSystem(conf);
1749            int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue();
1750            len = null == lenOpt
1751              ? fs.getFileStatus(filename).getLen()
1752              : lenOpt.getValue();
1753            file = openFile(fs, filename, bufSize, len);
1754          } else {
1755            len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
1756            file = streamOpt.getValue();
1757          }
1758          long start = startOpt == null ? 0 : startOpt.getValue();
1759          // really set up
1760          initialize(filename, file, start, len, conf, headerOnly != null);
1761        }
1762    
1763        /**
1764         * Construct a reader by opening a file from the given file system.
1765         * @param fs The file system used to open the file.
1766         * @param file The file being read.
1767         * @param conf Configuration
1768         * @throws IOException
1769         * @deprecated Use Reader(Configuration, Option...) instead.
1770         */
1771        @Deprecated
1772        public Reader(FileSystem fs, Path file, 
1773                      Configuration conf) throws IOException {
1774          this(conf, file(file.makeQualified(fs)));
1775        }
1776    
1777        /**
1778         * Construct a reader by the given input stream.
1779         * @param in An input stream.
1780         * @param buffersize unused
1781         * @param start The starting position.
1782         * @param length The length being read.
1783         * @param conf Configuration
1784         * @throws IOException
1785         * @deprecated Use Reader(Configuration, Reader.Option...) instead.
1786         */
1787        @Deprecated
1788        public Reader(FSDataInputStream in, int buffersize,
1789            long start, long length, Configuration conf) throws IOException {
1790          this(conf, stream(in), start(start), length(length));
1791        }
1792    
1793        /** Common work of the constructors. */
1794        private void initialize(Path filename, FSDataInputStream in,
1795                                long start, long length, Configuration conf,
1796                                boolean tempReader) throws IOException {
1797          if (in == null) {
1798            throw new IllegalArgumentException("in == null");
1799          }
1800          this.filename = filename == null ? "<unknown>" : filename.toString();
1801          this.in = in;
1802          this.conf = conf;
1803          boolean succeeded = false;
1804          try {
1805            seek(start);
1806            this.end = this.in.getPos() + length;
1807            // if it wrapped around, use the max
1808            if (end < length) {
1809              end = Long.MAX_VALUE;
1810            }
1811            init(tempReader);
1812            succeeded = true;
1813          } finally {
1814            if (!succeeded) {
1815              IOUtils.cleanup(LOG, this.in);
1816            }
1817          }
1818        }
1819    
1820        /**
1821         * Override this method to specialize the type of
1822         * {@link FSDataInputStream} returned.
1823         * @param fs The file system used to open the file.
1824         * @param file The file being read.
1825         * @param bufferSize The buffer size used to read the file.
1826         * @param length The length being read if it is >= 0.  Otherwise,
1827         *               the length is not available.
1828         * @return The opened stream.
1829         * @throws IOException
1830         */
1831        protected FSDataInputStream openFile(FileSystem fs, Path file,
1832            int bufferSize, long length) throws IOException {
1833          return fs.open(file, bufferSize);
1834        }
1835        
1836        /**
1837         * Initialize the {@link Reader}
1838         * @param tmpReader <code>true</code> if we are constructing a temporary
1839         *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
1840         *                  and hence do not initialize every component; 
1841         *                  <code>false</code> otherwise.
1842         * @throws IOException
1843         */
1844        private void init(boolean tempReader) throws IOException {
1845          byte[] versionBlock = new byte[VERSION.length];
1846          in.readFully(versionBlock);
1847    
1848          if ((versionBlock[0] != VERSION[0]) ||
1849              (versionBlock[1] != VERSION[1]) ||
1850              (versionBlock[2] != VERSION[2]))
1851            throw new IOException(this + " not a SequenceFile");
1852    
1853          // Set 'version'
1854          version = versionBlock[3];
1855          if (version > VERSION[3])
1856            throw new VersionMismatchException(VERSION[3], version);
1857    
1858          if (version < BLOCK_COMPRESS_VERSION) {
1859            UTF8 className = new UTF8();
1860    
1861            className.readFields(in);
1862            keyClassName = className.toStringChecked(); // key class name
1863    
1864            className.readFields(in);
1865            valClassName = className.toStringChecked(); // val class name
1866          } else {
1867            keyClassName = Text.readString(in);
1868            valClassName = Text.readString(in);
1869          }
1870    
1871          if (version > 2) {                          // if version > 2
1872            this.decompress = in.readBoolean();       // is compressed?
1873          } else {
1874            decompress = false;
1875          }
1876    
1877          if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
1878            this.blockCompressed = in.readBoolean();  // is block-compressed?
1879          } else {
1880            blockCompressed = false;
1881          }
1882          
1883          // if version >= 5
1884          // setup the compression codec
1885          if (decompress) {
1886            if (version >= CUSTOM_COMPRESS_VERSION) {
1887              String codecClassname = Text.readString(in);
1888              try {
1889                Class<? extends CompressionCodec> codecClass
1890                  = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
1891                this.codec = ReflectionUtils.newInstance(codecClass, conf);
1892              } catch (ClassNotFoundException cnfe) {
1893                throw new IllegalArgumentException("Unknown codec: " + 
1894                                                   codecClassname, cnfe);
1895              }
1896            } else {
1897              codec = new DefaultCodec();
1898              ((Configurable)codec).setConf(conf);
1899            }
1900          }
1901          
1902          this.metadata = new Metadata();
1903          if (version >= VERSION_WITH_METADATA) {    // if version >= 6
1904            this.metadata.readFields(in);
1905          }
1906          
1907          if (version > 1) {                          // if version > 1
1908            in.readFully(sync);                       // read sync bytes
1909            headerEnd = in.getPos();                  // record end of header
1910          }
1911          
1912          // Initialize... *not* if this we are constructing a temporary Reader
1913          if (!tempReader) {
1914            valBuffer = new DataInputBuffer();
1915            if (decompress) {
1916              valDecompressor = CodecPool.getDecompressor(codec);
1917              valInFilter = codec.createInputStream(valBuffer, valDecompressor);
1918              valIn = new DataInputStream(valInFilter);
1919            } else {
1920              valIn = valBuffer;
1921            }
1922    
1923            if (blockCompressed) {
1924              keyLenBuffer = new DataInputBuffer();
1925              keyBuffer = new DataInputBuffer();
1926              valLenBuffer = new DataInputBuffer();
1927    
1928              keyLenDecompressor = CodecPool.getDecompressor(codec);
1929              keyLenInFilter = codec.createInputStream(keyLenBuffer, 
1930                                                       keyLenDecompressor);
1931              keyLenIn = new DataInputStream(keyLenInFilter);
1932    
1933              keyDecompressor = CodecPool.getDecompressor(codec);
1934              keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
1935              keyIn = new DataInputStream(keyInFilter);
1936    
1937              valLenDecompressor = CodecPool.getDecompressor(codec);
1938              valLenInFilter = codec.createInputStream(valLenBuffer, 
1939                                                       valLenDecompressor);
1940              valLenIn = new DataInputStream(valLenInFilter);
1941            }
1942            
1943            SerializationFactory serializationFactory =
1944              new SerializationFactory(conf);
1945            this.keyDeserializer =
1946              getDeserializer(serializationFactory, getKeyClass());
1947            if (this.keyDeserializer == null) {
1948              throw new IOException(
1949                  "Could not find a deserializer for the Key class: '"
1950                      + getKeyClass().getCanonicalName() + "'. "
1951                      + "Please ensure that the configuration '" +
1952                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1953                      + "properly configured, if you're using "
1954                      + "custom serialization.");
1955            }
1956            if (!blockCompressed) {
1957              this.keyDeserializer.open(valBuffer);
1958            } else {
1959              this.keyDeserializer.open(keyIn);
1960            }
1961            this.valDeserializer =
1962              getDeserializer(serializationFactory, getValueClass());
1963            if (this.valDeserializer == null) {
1964              throw new IOException(
1965                  "Could not find a deserializer for the Value class: '"
1966                      + getValueClass().getCanonicalName() + "'. "
1967                      + "Please ensure that the configuration '" +
1968                      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "' is "
1969                      + "properly configured, if you're using "
1970                      + "custom serialization.");
1971            }
1972            this.valDeserializer.open(valIn);
1973          }
1974        }
1975        
1976        @SuppressWarnings("unchecked")
1977        private Deserializer getDeserializer(SerializationFactory sf, Class c) {
1978          return sf.getDeserializer(c);
1979        }
1980        
1981        /** Close the file. */
1982        @Override
1983        public synchronized void close() throws IOException {
1984          // Return the decompressors to the pool
1985          CodecPool.returnDecompressor(keyLenDecompressor);
1986          CodecPool.returnDecompressor(keyDecompressor);
1987          CodecPool.returnDecompressor(valLenDecompressor);
1988          CodecPool.returnDecompressor(valDecompressor);
1989          keyLenDecompressor = keyDecompressor = null;
1990          valLenDecompressor = valDecompressor = null;
1991          
1992          if (keyDeserializer != null) {
1993            keyDeserializer.close();
1994          }
1995          if (valDeserializer != null) {
1996            valDeserializer.close();
1997          }
1998          
1999          // Close the input-stream
2000          in.close();
2001        }
2002    
2003        /** Returns the name of the key class. */
2004        public String getKeyClassName() {
2005          return keyClassName;
2006        }
2007    
2008        /** Returns the class of keys in this file. */
2009        public synchronized Class<?> getKeyClass() {
2010          if (null == keyClass) {
2011            try {
2012              keyClass = WritableName.getClass(getKeyClassName(), conf);
2013            } catch (IOException e) {
2014              throw new RuntimeException(e);
2015            }
2016          }
2017          return keyClass;
2018        }
2019    
2020        /** Returns the name of the value class. */
2021        public String getValueClassName() {
2022          return valClassName;
2023        }
2024    
2025        /** Returns the class of values in this file. */
2026        public synchronized Class<?> getValueClass() {
2027          if (null == valClass) {
2028            try {
2029              valClass = WritableName.getClass(getValueClassName(), conf);
2030            } catch (IOException e) {
2031              throw new RuntimeException(e);
2032            }
2033          }
2034          return valClass;
2035        }
2036    
2037        /** Returns true if values are compressed. */
2038        public boolean isCompressed() { return decompress; }
2039        
2040        /** Returns true if records are block-compressed. */
2041        public boolean isBlockCompressed() { return blockCompressed; }
2042        
2043        /** Returns the compression codec of data in this file. */
2044        public CompressionCodec getCompressionCodec() { return codec; }
2045        
2046        /**
2047         * Get the compression type for this file.
2048         * @return the compression type
2049         */
2050        public CompressionType getCompressionType() {
2051          if (decompress) {
2052            return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD;
2053          } else {
2054            return CompressionType.NONE;
2055          }
2056        }
2057    
2058        /** Returns the metadata object of the file */
2059        public Metadata getMetadata() {
2060          return this.metadata;
2061        }
2062        
2063        /** Returns the configuration used for this file. */
2064        Configuration getConf() { return conf; }
2065        
2066        /** Read a compressed buffer */
2067        private synchronized void readBuffer(DataInputBuffer buffer, 
2068                                             CompressionInputStream filter) throws IOException {
2069          // Read data into a temporary buffer
2070          DataOutputBuffer dataBuffer = new DataOutputBuffer();
2071    
2072          try {
2073            int dataBufferLength = WritableUtils.readVInt(in);
2074            dataBuffer.write(in, dataBufferLength);
2075          
2076            // Set up 'buffer' connected to the input-stream
2077            buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
2078          } finally {
2079            dataBuffer.close();
2080          }
2081    
2082          // Reset the codec
2083          filter.resetState();
2084        }
2085        
2086        /** Read the next 'compressed' block */
2087        private synchronized void readBlock() throws IOException {
2088          // Check if we need to throw away a whole block of 
2089          // 'values' due to 'lazy decompression' 
2090          if (lazyDecompress && !valuesDecompressed) {
2091            in.seek(WritableUtils.readVInt(in)+in.getPos());
2092            in.seek(WritableUtils.readVInt(in)+in.getPos());
2093          }
2094          
2095          // Reset internal states
2096          noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
2097          valuesDecompressed = false;
2098    
2099          //Process sync
2100          if (sync != null) {
2101            in.readInt();
2102            in.readFully(syncCheck);                // read syncCheck
2103            if (!Arrays.equals(sync, syncCheck))    // check it
2104              throw new IOException("File is corrupt!");
2105          }
2106          syncSeen = true;
2107    
2108          // Read number of records in this block
2109          noBufferedRecords = WritableUtils.readVInt(in);
2110          
2111          // Read key lengths and keys
2112          readBuffer(keyLenBuffer, keyLenInFilter);
2113          readBuffer(keyBuffer, keyInFilter);
2114          noBufferedKeys = noBufferedRecords;
2115          
2116          // Read value lengths and values
2117          if (!lazyDecompress) {
2118            readBuffer(valLenBuffer, valLenInFilter);
2119            readBuffer(valBuffer, valInFilter);
2120            noBufferedValues = noBufferedRecords;
2121            valuesDecompressed = true;
2122          }
2123        }
2124    
2125        /** 
2126         * Position valLenIn/valIn to the 'value' 
2127         * corresponding to the 'current' key 
2128         */
2129        private synchronized void seekToCurrentValue() throws IOException {
2130          if (!blockCompressed) {
2131            if (decompress) {
2132              valInFilter.resetState();
2133            }
2134            valBuffer.reset();
2135          } else {
2136            // Check if this is the first value in the 'block' to be read
2137            if (lazyDecompress && !valuesDecompressed) {
2138              // Read the value lengths and values
2139              readBuffer(valLenBuffer, valLenInFilter);
2140              readBuffer(valBuffer, valInFilter);
2141              noBufferedValues = noBufferedRecords;
2142              valuesDecompressed = true;
2143            }
2144            
2145            // Calculate the no. of bytes to skip
2146            // Note: 'current' key has already been read!
2147            int skipValBytes = 0;
2148            int currentKey = noBufferedKeys + 1;          
2149            for (int i=noBufferedValues; i > currentKey; --i) {
2150              skipValBytes += WritableUtils.readVInt(valLenIn);
2151              --noBufferedValues;
2152            }
2153            
2154            // Skip to the 'val' corresponding to 'current' key
2155            if (skipValBytes > 0) {
2156              if (valIn.skipBytes(skipValBytes) != skipValBytes) {
2157                throw new IOException("Failed to seek to " + currentKey + 
2158                                      "(th) value!");
2159              }
2160            }
2161          }
2162        }
2163    
2164        /**
2165         * Get the 'value' corresponding to the last read 'key'.
2166         * @param val : The 'value' to be read.
2167         * @throws IOException
2168         */
2169        public synchronized void getCurrentValue(Writable val) 
2170          throws IOException {
2171          if (val instanceof Configurable) {
2172            ((Configurable) val).setConf(this.conf);
2173          }
2174    
2175          // Position stream to 'current' value
2176          seekToCurrentValue();
2177    
2178          if (!blockCompressed) {
2179            val.readFields(valIn);
2180            
2181            if (valIn.read() > 0) {
2182              LOG.info("available bytes: " + valIn.available());
2183              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2184                                    + " bytes, should read " +
2185                                    (valBuffer.getLength()-keyLength));
2186            }
2187          } else {
2188            // Get the value
2189            int valLength = WritableUtils.readVInt(valLenIn);
2190            val.readFields(valIn);
2191            
2192            // Read another compressed 'value'
2193            --noBufferedValues;
2194            
2195            // Sanity check
2196            if ((valLength < 0) && LOG.isDebugEnabled()) {
2197              LOG.debug(val + " is a zero-length value");
2198            }
2199          }
2200    
2201        }
2202        
2203        /**
2204         * Get the 'value' corresponding to the last read 'key'.
2205         * @param val : The 'value' to be read.
2206         * @throws IOException
2207         */
2208        public synchronized Object getCurrentValue(Object val) 
2209          throws IOException {
2210          if (val instanceof Configurable) {
2211            ((Configurable) val).setConf(this.conf);
2212          }
2213    
2214          // Position stream to 'current' value
2215          seekToCurrentValue();
2216    
2217          if (!blockCompressed) {
2218            val = deserializeValue(val);
2219            
2220            if (valIn.read() > 0) {
2221              LOG.info("available bytes: " + valIn.available());
2222              throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
2223                                    + " bytes, should read " +
2224                                    (valBuffer.getLength()-keyLength));
2225            }
2226          } else {
2227            // Get the value
2228            int valLength = WritableUtils.readVInt(valLenIn);
2229            val = deserializeValue(val);
2230            
2231            // Read another compressed 'value'
2232            --noBufferedValues;
2233            
2234            // Sanity check
2235            if ((valLength < 0) && LOG.isDebugEnabled()) {
2236              LOG.debug(val + " is a zero-length value");
2237            }
2238          }
2239          return val;
2240    
2241        }
2242    
2243        @SuppressWarnings("unchecked")
2244        private Object deserializeValue(Object val) throws IOException {
2245          return valDeserializer.deserialize(val);
2246        }
2247        
2248        /** Read the next key in the file into <code>key</code>, skipping its
2249         * value.  True if another entry exists, and false at end of file. */
2250        public synchronized boolean next(Writable key) throws IOException {
2251          if (key.getClass() != getKeyClass())
2252            throw new IOException("wrong key class: "+key.getClass().getName()
2253                                  +" is not "+keyClass);
2254    
2255          if (!blockCompressed) {
2256            outBuf.reset();
2257            
2258            keyLength = next(outBuf);
2259            if (keyLength < 0)
2260              return false;
2261            
2262            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2263            
2264            key.readFields(valBuffer);
2265            valBuffer.mark(0);
2266            if (valBuffer.getPosition() != keyLength)
2267              throw new IOException(key + " read " + valBuffer.getPosition()
2268                                    + " bytes, should read " + keyLength);
2269          } else {
2270            //Reset syncSeen
2271            syncSeen = false;
2272            
2273            if (noBufferedKeys == 0) {
2274              try {
2275                readBlock();
2276              } catch (EOFException eof) {
2277                return false;
2278              }
2279            }
2280            
2281            int keyLength = WritableUtils.readVInt(keyLenIn);
2282            
2283            // Sanity check
2284            if (keyLength < 0) {
2285              return false;
2286            }
2287            
2288            //Read another compressed 'key'
2289            key.readFields(keyIn);
2290            --noBufferedKeys;
2291          }
2292    
2293          return true;
2294        }
2295    
2296        /** Read the next key/value pair in the file into <code>key</code> and
2297         * <code>val</code>.  Returns true if such a pair exists and false when at
2298         * end of file */
2299        public synchronized boolean next(Writable key, Writable val)
2300          throws IOException {
2301          if (val.getClass() != getValueClass())
2302            throw new IOException("wrong value class: "+val+" is not "+valClass);
2303    
2304          boolean more = next(key);
2305          
2306          if (more) {
2307            getCurrentValue(val);
2308          }
2309    
2310          return more;
2311        }
2312        
2313        /**
2314         * Read and return the next record length, potentially skipping over 
2315         * a sync block.
2316         * @return the length of the next record or -1 if there is no next record
2317         * @throws IOException
2318         */
2319        private synchronized int readRecordLength() throws IOException {
2320          if (in.getPos() >= end) {
2321            return -1;
2322          }      
2323          int length = in.readInt();
2324          if (version > 1 && sync != null &&
2325              length == SYNC_ESCAPE) {              // process a sync entry
2326            in.readFully(syncCheck);                // read syncCheck
2327            if (!Arrays.equals(sync, syncCheck))    // check it
2328              throw new IOException("File is corrupt!");
2329            syncSeen = true;
2330            if (in.getPos() >= end) {
2331              return -1;
2332            }
2333            length = in.readInt();                  // re-read length
2334          } else {
2335            syncSeen = false;
2336          }
2337          
2338          return length;
2339        }
2340        
2341        /** Read the next key/value pair in the file into <code>buffer</code>.
2342         * Returns the length of the key read, or -1 if at end of file.  The length
2343         * of the value may be computed by calling buffer.getLength() before and
2344         * after calls to this method. */
2345        /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
2346        @Deprecated
2347        synchronized int next(DataOutputBuffer buffer) throws IOException {
2348          // Unsupported for block-compressed sequence files
2349          if (blockCompressed) {
2350            throw new IOException("Unsupported call for block-compressed" +
2351                                  " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
2352          }
2353          try {
2354            int length = readRecordLength();
2355            if (length == -1) {
2356              return -1;
2357            }
2358            int keyLength = in.readInt();
2359            buffer.write(in, length);
2360            return keyLength;
2361          } catch (ChecksumException e) {             // checksum failure
2362            handleChecksumException(e);
2363            return next(buffer);
2364          }
2365        }
2366    
2367        public ValueBytes createValueBytes() {
2368          ValueBytes val = null;
2369          if (!decompress || blockCompressed) {
2370            val = new UncompressedBytes();
2371          } else {
2372            val = new CompressedBytes(codec);
2373          }
2374          return val;
2375        }
2376    
2377        /**
2378         * Read 'raw' records.
2379         * @param key - The buffer into which the key is read
2380         * @param val - The 'raw' value
2381         * @return Returns the total record length or -1 for end of file
2382         * @throws IOException
2383         */
2384        public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
2385          throws IOException {
2386          if (!blockCompressed) {
2387            int length = readRecordLength();
2388            if (length == -1) {
2389              return -1;
2390            }
2391            int keyLength = in.readInt();
2392            int valLength = length - keyLength;
2393            key.write(in, keyLength);
2394            if (decompress) {
2395              CompressedBytes value = (CompressedBytes)val;
2396              value.reset(in, valLength);
2397            } else {
2398              UncompressedBytes value = (UncompressedBytes)val;
2399              value.reset(in, valLength);
2400            }
2401            
2402            return length;
2403          } else {
2404            //Reset syncSeen
2405            syncSeen = false;
2406            
2407            // Read 'key'
2408            if (noBufferedKeys == 0) {
2409              if (in.getPos() >= end) 
2410                return -1;
2411    
2412              try { 
2413                readBlock();
2414              } catch (EOFException eof) {
2415                return -1;
2416              }
2417            }
2418            int keyLength = WritableUtils.readVInt(keyLenIn);
2419            if (keyLength < 0) {
2420              throw new IOException("zero length key found!");
2421            }
2422            key.write(keyIn, keyLength);
2423            --noBufferedKeys;
2424            
2425            // Read raw 'value'
2426            seekToCurrentValue();
2427            int valLength = WritableUtils.readVInt(valLenIn);
2428            UncompressedBytes rawValue = (UncompressedBytes)val;
2429            rawValue.reset(valIn, valLength);
2430            --noBufferedValues;
2431            
2432            return (keyLength+valLength);
2433          }
2434          
2435        }
2436    
2437        /**
2438         * Read 'raw' keys.
2439         * @param key - The buffer into which the key is read
2440         * @return Returns the key length or -1 for end of file
2441         * @throws IOException
2442         */
2443        public synchronized int nextRawKey(DataOutputBuffer key) 
2444          throws IOException {
2445          if (!blockCompressed) {
2446            recordLength = readRecordLength();
2447            if (recordLength == -1) {
2448              return -1;
2449            }
2450            keyLength = in.readInt();
2451            key.write(in, keyLength);
2452            return keyLength;
2453          } else {
2454            //Reset syncSeen
2455            syncSeen = false;
2456            
2457            // Read 'key'
2458            if (noBufferedKeys == 0) {
2459              if (in.getPos() >= end) 
2460                return -1;
2461    
2462              try { 
2463                readBlock();
2464              } catch (EOFException eof) {
2465                return -1;
2466              }
2467            }
2468            int keyLength = WritableUtils.readVInt(keyLenIn);
2469            if (keyLength < 0) {
2470              throw new IOException("zero length key found!");
2471            }
2472            key.write(keyIn, keyLength);
2473            --noBufferedKeys;
2474            
2475            return keyLength;
2476          }
2477          
2478        }
2479    
2480        /** Read the next key in the file, skipping its
2481         * value.  Return null at end of file. */
2482        public synchronized Object next(Object key) throws IOException {
2483          if (key != null && key.getClass() != getKeyClass()) {
2484            throw new IOException("wrong key class: "+key.getClass().getName()
2485                                  +" is not "+keyClass);
2486          }
2487    
2488          if (!blockCompressed) {
2489            outBuf.reset();
2490            
2491            keyLength = next(outBuf);
2492            if (keyLength < 0)
2493              return null;
2494            
2495            valBuffer.reset(outBuf.getData(), outBuf.getLength());
2496            
2497            key = deserializeKey(key);
2498            valBuffer.mark(0);
2499            if (valBuffer.getPosition() != keyLength)
2500              throw new IOException(key + " read " + valBuffer.getPosition()
2501                                    + " bytes, should read " + keyLength);
2502          } else {
2503            //Reset syncSeen
2504            syncSeen = false;
2505            
2506            if (noBufferedKeys == 0) {
2507              try {
2508                readBlock();
2509              } catch (EOFException eof) {
2510                return null;
2511              }
2512            }
2513            
2514            int keyLength = WritableUtils.readVInt(keyLenIn);
2515            
2516            // Sanity check
2517            if (keyLength < 0) {
2518              return null;
2519            }
2520            
2521            //Read another compressed 'key'
2522            key = deserializeKey(key);
2523            --noBufferedKeys;
2524          }
2525    
2526          return key;
2527        }
2528    
2529        @SuppressWarnings("unchecked")
2530        private Object deserializeKey(Object key) throws IOException {
2531          return keyDeserializer.deserialize(key);
2532        }
2533    
2534        /**
2535         * Read 'raw' values.
2536         * @param val - The 'raw' value
2537         * @return Returns the value length
2538         * @throws IOException
2539         */
2540        public synchronized int nextRawValue(ValueBytes val) 
2541          throws IOException {
2542          
2543          // Position stream to current value
2544          seekToCurrentValue();
2545     
2546          if (!blockCompressed) {
2547            int valLength = recordLength - keyLength;
2548            if (decompress) {
2549              CompressedBytes value = (CompressedBytes)val;
2550              value.reset(in, valLength);
2551            } else {
2552              UncompressedBytes value = (UncompressedBytes)val;
2553              value.reset(in, valLength);
2554            }
2555             
2556            return valLength;
2557          } else {
2558            int valLength = WritableUtils.readVInt(valLenIn);
2559            UncompressedBytes rawValue = (UncompressedBytes)val;
2560            rawValue.reset(valIn, valLength);
2561            --noBufferedValues;
2562            return valLength;
2563          }
2564          
2565        }
2566    
2567        private void handleChecksumException(ChecksumException e)
2568          throws IOException {
2569          if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
2570            LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
2571            sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
2572          } else {
2573            throw e;
2574          }
2575        }
2576    
2577        /** disables sync. often invoked for tmp files */
2578        synchronized void ignoreSync() {
2579          sync = null;
2580        }
2581        
2582        /** Set the current byte position in the input file.
2583         *
2584         * <p>The position passed must be a position returned by {@link
2585         * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
2586         * position, use {@link SequenceFile.Reader#sync(long)}.
2587         */
2588        public synchronized void seek(long position) throws IOException {
2589          in.seek(position);
2590          if (blockCompressed) {                      // trigger block read
2591            noBufferedKeys = 0;
2592            valuesDecompressed = true;
2593          }
2594        }
2595    
2596        /** Seek to the next sync mark past a given position.*/
2597        public synchronized void sync(long position) throws IOException {
2598          if (position+SYNC_SIZE >= end) {
2599            seek(end);
2600            return;
2601          }
2602    
2603          if (position < headerEnd) {
2604            // seek directly to first record
2605            in.seek(headerEnd);
2606            // note the sync marker "seen" in the header
2607            syncSeen = true;
2608            return;
2609          }
2610    
2611          try {
2612            seek(position+4);                         // skip escape
2613            in.readFully(syncCheck);
2614            int syncLen = sync.length;
2615            for (int i = 0; in.getPos() < end; i++) {
2616              int j = 0;
2617              for (; j < syncLen; j++) {
2618                if (sync[j] != syncCheck[(i+j)%syncLen])
2619                  break;
2620              }
2621              if (j == syncLen) {
2622                in.seek(in.getPos() - SYNC_SIZE);     // position before sync
2623                return;
2624              }
2625              syncCheck[i%syncLen] = in.readByte();
2626            }
2627          } catch (ChecksumException e) {             // checksum failure
2628            handleChecksumException(e);
2629          }
2630        }
2631    
2632        /** Returns true iff the previous call to next passed a sync mark.*/
2633        public synchronized boolean syncSeen() { return syncSeen; }
2634    
2635        /** Return the current byte position in the input file. */
2636        public synchronized long getPosition() throws IOException {
2637          return in.getPos();
2638        }
2639    
2640        /** Returns the name of the file. */
2641        @Override
2642        public String toString() {
2643          return filename;
2644        }
2645    
2646      }
2647    
2648      /** Sorts key/value pairs in a sequence-format file.
2649       *
2650       * <p>For best performance, applications should make sure that the {@link
2651       * Writable#readFields(DataInput)} implementation of their keys is
2652       * very efficient.  In particular, it should avoid allocating memory.
2653       */
2654      public static class Sorter {
2655    
2656        private RawComparator comparator;
2657    
2658        private MergeSort mergeSort; //the implementation of merge sort
2659        
2660        private Path[] inFiles;                     // when merging or sorting
2661    
2662        private Path outFile;
2663    
2664        private int memory; // bytes
2665        private int factor; // merged per pass
2666    
2667        private FileSystem fs = null;
2668    
2669        private Class keyClass;
2670        private Class valClass;
2671    
2672        private Configuration conf;
2673        private Metadata metadata;
2674        
2675        private Progressable progressable = null;
2676    
2677        /** Sort and merge files containing the named classes. */
2678        public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
2679                      Class valClass, Configuration conf)  {
2680          this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf);
2681        }
2682    
2683        /** Sort and merge using an arbitrary {@link RawComparator}. */
2684        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
2685                      Class valClass, Configuration conf) {
2686          this(fs, comparator, keyClass, valClass, conf, new Metadata());
2687        }
2688    
2689        /** Sort and merge using an arbitrary {@link RawComparator}. */
2690        public Sorter(FileSystem fs, RawComparator comparator, Class keyClass,
2691                      Class valClass, Configuration conf, Metadata metadata) {
2692          this.fs = fs;
2693          this.comparator = comparator;
2694          this.keyClass = keyClass;
2695          this.valClass = valClass;
2696          this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
2697          this.factor = conf.getInt("io.sort.factor", 100);
2698          this.conf = conf;
2699          this.metadata = metadata;
2700        }
2701    
2702        /** Set the number of streams to merge at once.*/
2703        public void setFactor(int factor) { this.factor = factor; }
2704    
2705        /** Get the number of streams to merge at once.*/
2706        public int getFactor() { return factor; }
2707    
2708        /** Set the total amount of buffer memory, in bytes.*/
2709        public void setMemory(int memory) { this.memory = memory; }
2710    
2711        /** Get the total amount of buffer memory, in bytes.*/
2712        public int getMemory() { return memory; }
2713    
2714        /** Set the progressable object in order to report progress. */
2715        public void setProgressable(Progressable progressable) {
2716          this.progressable = progressable;
2717        }
2718        
2719        /** 
2720         * Perform a file sort from a set of input files into an output file.
2721         * @param inFiles the files to be sorted
2722         * @param outFile the sorted output file
2723         * @param deleteInput should the input files be deleted as they are read?
2724         */
2725        public void sort(Path[] inFiles, Path outFile,
2726                         boolean deleteInput) throws IOException {
2727          if (fs.exists(outFile)) {
2728            throw new IOException("already exists: " + outFile);
2729          }
2730    
2731          this.inFiles = inFiles;
2732          this.outFile = outFile;
2733    
2734          int segments = sortPass(deleteInput);
2735          if (segments > 1) {
2736            mergePass(outFile.getParent());
2737          }
2738        }
2739    
2740        /** 
2741         * Perform a file sort from a set of input files and return an iterator.
2742         * @param inFiles the files to be sorted
2743         * @param tempDir the directory where temp files are created during sort
2744         * @param deleteInput should the input files be deleted as they are read?
2745         * @return iterator the RawKeyValueIterator
2746         */
2747        public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
2748                                                  boolean deleteInput) throws IOException {
2749          Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
2750          if (fs.exists(outFile)) {
2751            throw new IOException("already exists: " + outFile);
2752          }
2753          this.inFiles = inFiles;
2754          //outFile will basically be used as prefix for temp files in the cases
2755          //where sort outputs multiple sorted segments. For the single segment
2756          //case, the outputFile itself will contain the sorted data for that
2757          //segment
2758          this.outFile = outFile;
2759    
2760          int segments = sortPass(deleteInput);
2761          if (segments > 1)
2762            return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
2763                         tempDir);
2764          else if (segments == 1)
2765            return merge(new Path[]{outFile}, true, tempDir);
2766          else return null;
2767        }
2768    
2769        /**
2770         * The backwards compatible interface to sort.
2771         * @param inFile the input file to sort
2772         * @param outFile the sorted output file
2773         */
2774        public void sort(Path inFile, Path outFile) throws IOException {
2775          sort(new Path[]{inFile}, outFile, false);
2776        }
2777        
2778        private int sortPass(boolean deleteInput) throws IOException {
2779          if(LOG.isDebugEnabled()) {
2780            LOG.debug("running sort pass");
2781          }
2782          SortPass sortPass = new SortPass();         // make the SortPass
2783          sortPass.setProgressable(progressable);
2784          mergeSort = new MergeSort(sortPass.new SeqFileComparator());
2785          try {
2786            return sortPass.run(deleteInput);         // run it
2787          } finally {
2788            sortPass.close();                         // close it
2789          }
2790        }
2791    
2792        private class SortPass {
2793          private int memoryLimit = memory/4;
2794          private int recordLimit = 1000000;
2795          
2796          private DataOutputBuffer rawKeys = new DataOutputBuffer();
2797          private byte[] rawBuffer;
2798    
2799          private int[] keyOffsets = new int[1024];
2800          private int[] pointers = new int[keyOffsets.length];
2801          private int[] pointersCopy = new int[keyOffsets.length];
2802          private int[] keyLengths = new int[keyOffsets.length];
2803          private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
2804          
2805          private ArrayList segmentLengths = new ArrayList();
2806          
2807          private Reader in = null;
2808          private FSDataOutputStream out = null;
2809          private FSDataOutputStream indexOut = null;
2810          private Path outName;
2811    
2812          private Progressable progressable = null;
2813    
2814          public int run(boolean deleteInput) throws IOException {
2815            int segments = 0;
2816            int currentFile = 0;
2817            boolean atEof = (currentFile >= inFiles.length);
2818            CompressionType compressionType;
2819            CompressionCodec codec = null;
2820            segmentLengths.clear();
2821            if (atEof) {
2822              return 0;
2823            }
2824            
2825            // Initialize
2826            in = new Reader(fs, inFiles[currentFile], conf);
2827            compressionType = in.getCompressionType();
2828            codec = in.getCompressionCodec();
2829            
2830            for (int i=0; i < rawValues.length; ++i) {
2831              rawValues[i] = null;
2832            }
2833            
2834            while (!atEof) {
2835              int count = 0;
2836              int bytesProcessed = 0;
2837              rawKeys.reset();
2838              while (!atEof && 
2839                     bytesProcessed < memoryLimit && count < recordLimit) {
2840    
2841                // Read a record into buffer
2842                // Note: Attempt to re-use 'rawValue' as far as possible
2843                int keyOffset = rawKeys.getLength();       
2844                ValueBytes rawValue = 
2845                  (count == keyOffsets.length || rawValues[count] == null) ? 
2846                  in.createValueBytes() : 
2847                  rawValues[count];
2848                int recordLength = in.nextRaw(rawKeys, rawValue);
2849                if (recordLength == -1) {
2850                  in.close();
2851                  if (deleteInput) {
2852                    fs.delete(inFiles[currentFile], true);
2853                  }
2854                  currentFile += 1;
2855                  atEof = currentFile >= inFiles.length;
2856                  if (!atEof) {
2857                    in = new Reader(fs, inFiles[currentFile], conf);
2858                  } else {
2859                    in = null;
2860                  }
2861                  continue;
2862                }
2863    
2864                int keyLength = rawKeys.getLength() - keyOffset;
2865    
2866                if (count == keyOffsets.length)
2867                  grow();
2868    
2869                keyOffsets[count] = keyOffset;                // update pointers
2870                pointers[count] = count;
2871                keyLengths[count] = keyLength;
2872                rawValues[count] = rawValue;
2873    
2874                bytesProcessed += recordLength; 
2875                count++;
2876              }
2877    
2878              // buffer is full -- sort & flush it
2879              if(LOG.isDebugEnabled()) {
2880                LOG.debug("flushing segment " + segments);
2881              }
2882              rawBuffer = rawKeys.getData();
2883              sort(count);
2884              // indicate we're making progress
2885              if (progressable != null) {
2886                progressable.progress();
2887              }
2888              flush(count, bytesProcessed, compressionType, codec, 
2889                    segments==0 && atEof);
2890              segments++;
2891            }
2892            return segments;
2893          }
2894    
2895          public void close() throws IOException {
2896            if (in != null) {
2897              in.close();
2898            }
2899            if (out != null) {
2900              out.close();
2901            }
2902            if (indexOut != null) {
2903              indexOut.close();
2904            }
2905          }
2906    
2907          private void grow() {
2908            int newLength = keyOffsets.length * 3 / 2;
2909            keyOffsets = grow(keyOffsets, newLength);
2910            pointers = grow(pointers, newLength);
2911            pointersCopy = new int[newLength];
2912            keyLengths = grow(keyLengths, newLength);
2913            rawValues = grow(rawValues, newLength);
2914          }
2915    
2916          private int[] grow(int[] old, int newLength) {
2917            int[] result = new int[newLength];
2918            System.arraycopy(old, 0, result, 0, old.length);
2919            return result;
2920          }
2921          
2922          private ValueBytes[] grow(ValueBytes[] old, int newLength) {
2923            ValueBytes[] result = new ValueBytes[newLength];
2924            System.arraycopy(old, 0, result, 0, old.length);
2925            for (int i=old.length; i < newLength; ++i) {
2926              result[i] = null;
2927            }
2928            return result;
2929          }
2930    
2931          private void flush(int count, int bytesProcessed, 
2932                             CompressionType compressionType, 
2933                             CompressionCodec codec, 
2934                             boolean done) throws IOException {
2935            if (out == null) {
2936              outName = done ? outFile : outFile.suffix(".0");
2937              out = fs.create(outName);
2938              if (!done) {
2939                indexOut = fs.create(outName.suffix(".index"));
2940              }
2941            }
2942    
2943            long segmentStart = out.getPos();
2944            Writer writer = createWriter(conf, Writer.stream(out), 
2945                Writer.keyClass(keyClass), Writer.valueClass(valClass),
2946                Writer.compression(compressionType, codec),
2947                Writer.metadata(done ? metadata : new Metadata()));
2948            
2949            if (!done) {
2950              writer.sync = null;                     // disable sync on temp files
2951            }
2952    
2953            for (int i = 0; i < count; i++) {         // write in sorted order
2954              int p = pointers[i];
2955              writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
2956            }
2957            writer.close();
2958            
2959            if (!done) {
2960              // Save the segment length
2961              WritableUtils.writeVLong(indexOut, segmentStart);
2962              WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
2963              indexOut.flush();
2964            }
2965          }
2966    
2967          private void sort(int count) {
2968            System.arraycopy(pointers, 0, pointersCopy, 0, count);
2969            mergeSort.mergeSort(pointersCopy, pointers, 0, count);
2970          }
2971          class SeqFileComparator implements Comparator<IntWritable> {
2972            @Override
2973            public int compare(IntWritable I, IntWritable J) {
2974              return comparator.compare(rawBuffer, keyOffsets[I.get()], 
2975                                        keyLengths[I.get()], rawBuffer, 
2976                                        keyOffsets[J.get()], keyLengths[J.get()]);
2977            }
2978          }
2979          
2980          /** set the progressable object in order to report progress */
2981          public void setProgressable(Progressable progressable)
2982          {
2983            this.progressable = progressable;
2984          }
2985          
2986        } // SequenceFile.Sorter.SortPass
2987    
2988        /** The interface to iterate over raw keys/values of SequenceFiles. */
2989        public static interface RawKeyValueIterator {
2990          /** Gets the current raw key
2991           * @return DataOutputBuffer
2992           * @throws IOException
2993           */
2994          DataOutputBuffer getKey() throws IOException; 
2995          /** Gets the current raw value
2996           * @return ValueBytes 
2997           * @throws IOException
2998           */
2999          ValueBytes getValue() throws IOException; 
3000          /** Sets up the current key and value (for getKey and getValue)
3001           * @return true if there exists a key/value, false otherwise 
3002           * @throws IOException
3003           */
3004          boolean next() throws IOException;
3005          /** closes the iterator so that the underlying streams can be closed
3006           * @throws IOException
3007           */
3008          void close() throws IOException;
3009          /** Gets the Progress object; this has a float (0.0 - 1.0) 
3010           * indicating the bytes processed by the iterator so far
3011           */
3012          Progress getProgress();
3013        }    
3014        
3015        /**
3016         * Merges the list of segments of type <code>SegmentDescriptor</code>
3017         * @param segments the list of SegmentDescriptors
3018         * @param tmpDir the directory to write temporary files into
3019         * @return RawKeyValueIterator
3020         * @throws IOException
3021         */
3022        public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
3023                                         Path tmpDir) 
3024          throws IOException {
3025          // pass in object to report progress, if present
3026          MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
3027          return mQueue.merge();
3028        }
3029    
3030        /**
3031         * Merges the contents of files passed in Path[] using a max factor value
3032         * that is already set
3033         * @param inNames the array of path names
3034         * @param deleteInputs true if the input files should be deleted when 
3035         * unnecessary
3036         * @param tmpDir the directory to write temporary files into
3037         * @return RawKeyValueIteratorMergeQueue
3038         * @throws IOException
3039         */
3040        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3041                                         Path tmpDir) 
3042          throws IOException {
3043          return merge(inNames, deleteInputs, 
3044                       (inNames.length < factor) ? inNames.length : factor,
3045                       tmpDir);
3046        }
3047    
3048        /**
3049         * Merges the contents of files passed in Path[]
3050         * @param inNames the array of path names
3051         * @param deleteInputs true if the input files should be deleted when 
3052         * unnecessary
3053         * @param factor the factor that will be used as the maximum merge fan-in
3054         * @param tmpDir the directory to write temporary files into
3055         * @return RawKeyValueIteratorMergeQueue
3056         * @throws IOException
3057         */
3058        public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
3059                                         int factor, Path tmpDir) 
3060          throws IOException {
3061          //get the segments from inNames
3062          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3063          for (int i = 0; i < inNames.length; i++) {
3064            SegmentDescriptor s = new SegmentDescriptor(0,
3065                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3066            s.preserveInput(!deleteInputs);
3067            s.doSync();
3068            a.add(s);
3069          }
3070          this.factor = factor;
3071          MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
3072          return mQueue.merge();
3073        }
3074    
3075        /**
3076         * Merges the contents of files passed in Path[]
3077         * @param inNames the array of path names
3078         * @param tempDir the directory for creating temp files during merge
3079         * @param deleteInputs true if the input files should be deleted when 
3080         * unnecessary
3081         * @return RawKeyValueIteratorMergeQueue
3082         * @throws IOException
3083         */
3084        public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
3085                                         boolean deleteInputs) 
3086          throws IOException {
3087          //outFile will basically be used as prefix for temp files for the
3088          //intermediate merge outputs           
3089          this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
3090          //get the segments from inNames
3091          ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
3092          for (int i = 0; i < inNames.length; i++) {
3093            SegmentDescriptor s = new SegmentDescriptor(0,
3094                fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
3095            s.preserveInput(!deleteInputs);
3096            s.doSync();
3097            a.add(s);
3098          }
3099          factor = (inNames.length < factor) ? inNames.length : factor;
3100          // pass in object to report progress, if present
3101          MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
3102          return mQueue.merge();
3103        }
3104    
3105        /**
3106         * Clones the attributes (like compression of the input file and creates a 
3107         * corresponding Writer
3108         * @param inputFile the path of the input file whose attributes should be 
3109         * cloned
3110         * @param outputFile the path of the output file 
3111         * @param prog the Progressable to report status during the file write
3112         * @return Writer
3113         * @throws IOException
3114         */
3115        public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
3116                                          Progressable prog) throws IOException {
3117          Reader reader = new Reader(conf,
3118                                     Reader.file(inputFile),
3119                                     new Reader.OnlyHeaderOption());
3120          CompressionType compress = reader.getCompressionType();
3121          CompressionCodec codec = reader.getCompressionCodec();
3122          reader.close();
3123    
3124          Writer writer = createWriter(conf, 
3125                                       Writer.file(outputFile), 
3126                                       Writer.keyClass(keyClass), 
3127                                       Writer.valueClass(valClass), 
3128                                       Writer.compression(compress, codec), 
3129                                       Writer.progressable(prog));
3130          return writer;
3131        }
3132    
3133        /**
3134         * Writes records from RawKeyValueIterator into a file represented by the 
3135         * passed writer
3136         * @param records the RawKeyValueIterator
3137         * @param writer the Writer created earlier 
3138         * @throws IOException
3139         */
3140        public void writeFile(RawKeyValueIterator records, Writer writer) 
3141          throws IOException {
3142          while(records.next()) {
3143            writer.appendRaw(records.getKey().getData(), 0, 
3144                             records.getKey().getLength(), records.getValue());
3145          }
3146          writer.sync();
3147        }
3148            
3149        /** Merge the provided files.
3150         * @param inFiles the array of input path names
3151         * @param outFile the final output file
3152         * @throws IOException
3153         */
3154        public void merge(Path[] inFiles, Path outFile) throws IOException {
3155          if (fs.exists(outFile)) {
3156            throw new IOException("already exists: " + outFile);
3157          }
3158          RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
3159          Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
3160          
3161          writeFile(r, writer);
3162    
3163          writer.close();
3164        }
3165    
3166        /** sort calls this to generate the final merged output */
3167        private int mergePass(Path tmpDir) throws IOException {
3168          if(LOG.isDebugEnabled()) {
3169            LOG.debug("running merge pass");
3170          }
3171          Writer writer = cloneFileAttributes(
3172                                              outFile.suffix(".0"), outFile, null);
3173          RawKeyValueIterator r = merge(outFile.suffix(".0"), 
3174                                        outFile.suffix(".0.index"), tmpDir);
3175          writeFile(r, writer);
3176    
3177          writer.close();
3178          return 0;
3179        }
3180    
3181        /** Used by mergePass to merge the output of the sort
3182         * @param inName the name of the input file containing sorted segments
3183         * @param indexIn the offsets of the sorted segments
3184         * @param tmpDir the relative directory to store intermediate results in
3185         * @return RawKeyValueIterator
3186         * @throws IOException
3187         */
3188        private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
3189          throws IOException {
3190          //get the segments from indexIn
3191          //we create a SegmentContainer so that we can track segments belonging to
3192          //inName and delete inName as soon as we see that we have looked at all
3193          //the contained segments during the merge process & hence don't need 
3194          //them anymore
3195          SegmentContainer container = new SegmentContainer(inName, indexIn);
3196          MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
3197          return mQueue.merge();
3198        }
3199        
3200        /** This class implements the core of the merge logic */
3201        private class MergeQueue extends PriorityQueue 
3202          implements RawKeyValueIterator {
3203          private boolean compress;
3204          private boolean blockCompress;
3205          private DataOutputBuffer rawKey = new DataOutputBuffer();
3206          private ValueBytes rawValue;
3207          private long totalBytesProcessed;
3208          private float progPerByte;
3209          private Progress mergeProgress = new Progress();
3210          private Path tmpDir;
3211          private Progressable progress = null; //handle to the progress reporting object
3212          private SegmentDescriptor minSegment;
3213          
3214          //a TreeMap used to store the segments sorted by size (segment offset and
3215          //segment path name is used to break ties between segments of same sizes)
3216          private Map<SegmentDescriptor, Void> sortedSegmentSizes =
3217            new TreeMap<SegmentDescriptor, Void>();
3218                
3219          @SuppressWarnings("unchecked")
3220          public void put(SegmentDescriptor stream) throws IOException {
3221            if (size() == 0) {
3222              compress = stream.in.isCompressed();
3223              blockCompress = stream.in.isBlockCompressed();
3224            } else if (compress != stream.in.isCompressed() || 
3225                       blockCompress != stream.in.isBlockCompressed()) {
3226              throw new IOException("All merged files must be compressed or not.");
3227            } 
3228            super.put(stream);
3229          }
3230          
3231          /**
3232           * A queue of file segments to merge
3233           * @param segments the file segments to merge
3234           * @param tmpDir a relative local directory to save intermediate files in
3235           * @param progress the reference to the Progressable object
3236           */
3237          public MergeQueue(List <SegmentDescriptor> segments,
3238              Path tmpDir, Progressable progress) {
3239            int size = segments.size();
3240            for (int i = 0; i < size; i++) {
3241              sortedSegmentSizes.put(segments.get(i), null);
3242            }
3243            this.tmpDir = tmpDir;
3244            this.progress = progress;
3245          }
3246          @Override
3247          protected boolean lessThan(Object a, Object b) {
3248            // indicate we're making progress
3249            if (progress != null) {
3250              progress.progress();
3251            }
3252            SegmentDescriptor msa = (SegmentDescriptor)a;
3253            SegmentDescriptor msb = (SegmentDescriptor)b;
3254            return comparator.compare(msa.getKey().getData(), 0, 
3255                                      msa.getKey().getLength(), msb.getKey().getData(), 0, 
3256                                      msb.getKey().getLength()) < 0;
3257          }
3258          @Override
3259          public void close() throws IOException {
3260            SegmentDescriptor ms;                           // close inputs
3261            while ((ms = (SegmentDescriptor)pop()) != null) {
3262              ms.cleanup();
3263            }
3264            minSegment = null;
3265          }
3266          @Override
3267          public DataOutputBuffer getKey() throws IOException {
3268            return rawKey;
3269          }
3270          @Override
3271          public ValueBytes getValue() throws IOException {
3272            return rawValue;
3273          }
3274          @Override
3275          public boolean next() throws IOException {
3276            if (size() == 0)
3277              return false;
3278            if (minSegment != null) {
3279              //minSegment is non-null for all invocations of next except the first
3280              //one. For the first invocation, the priority queue is ready for use
3281              //but for the subsequent invocations, first adjust the queue 
3282              adjustPriorityQueue(minSegment);
3283              if (size() == 0) {
3284                minSegment = null;
3285                return false;
3286              }
3287            }
3288            minSegment = (SegmentDescriptor)top();
3289            long startPos = minSegment.in.getPosition(); // Current position in stream
3290            //save the raw key reference
3291            rawKey = minSegment.getKey();
3292            //load the raw value. Re-use the existing rawValue buffer
3293            if (rawValue == null) {
3294              rawValue = minSegment.in.createValueBytes();
3295            }
3296            minSegment.nextRawValue(rawValue);
3297            long endPos = minSegment.in.getPosition(); // End position after reading value
3298            updateProgress(endPos - startPos);
3299            return true;
3300          }
3301          
3302          @Override
3303          public Progress getProgress() {
3304            return mergeProgress; 
3305          }
3306    
3307          private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
3308            long startPos = ms.in.getPosition(); // Current position in stream
3309            boolean hasNext = ms.nextRawKey();
3310            long endPos = ms.in.getPosition(); // End position after reading key
3311            updateProgress(endPos - startPos);
3312            if (hasNext) {
3313              adjustTop();
3314            } else {
3315              pop();
3316              ms.cleanup();
3317            }
3318          }
3319    
3320          private void updateProgress(long bytesProcessed) {
3321            totalBytesProcessed += bytesProcessed;
3322            if (progPerByte > 0) {
3323              mergeProgress.set(totalBytesProcessed * progPerByte);
3324            }
3325          }
3326          
3327          /** This is the single level merge that is called multiple times 
3328           * depending on the factor size and the number of segments
3329           * @return RawKeyValueIterator
3330           * @throws IOException
3331           */
3332          public RawKeyValueIterator merge() throws IOException {
3333            //create the MergeStreams from the sorted map created in the constructor
3334            //and dump the final output to a file
3335            int numSegments = sortedSegmentSizes.size();
3336            int origFactor = factor;
3337            int passNo = 1;
3338            LocalDirAllocator lDirAlloc = new LocalDirAllocator("io.seqfile.local.dir");
3339            do {
3340              //get the factor for this pass of merge
3341              factor = getPassFactor(passNo, numSegments);
3342              List<SegmentDescriptor> segmentsToMerge =
3343                new ArrayList<SegmentDescriptor>();
3344              int segmentsConsidered = 0;
3345              int numSegmentsToConsider = factor;
3346              while (true) {
3347                //extract the smallest 'factor' number of segment pointers from the 
3348                //TreeMap. Call cleanup on the empty segments (no key/value data)
3349                SegmentDescriptor[] mStream = 
3350                  getSegmentDescriptors(numSegmentsToConsider);
3351                for (int i = 0; i < mStream.length; i++) {
3352                  if (mStream[i].nextRawKey()) {
3353                    segmentsToMerge.add(mStream[i]);
3354                    segmentsConsidered++;
3355                    // Count the fact that we read some bytes in calling nextRawKey()
3356                    updateProgress(mStream[i].in.getPosition());
3357                  }
3358                  else {
3359                    mStream[i].cleanup();
3360                    numSegments--; //we ignore this segment for the merge
3361                  }
3362                }
3363                //if we have the desired number of segments
3364                //or looked at all available segments, we break
3365                if (segmentsConsidered == factor || 
3366                    sortedSegmentSizes.size() == 0) {
3367                  break;
3368                }
3369                  
3370                numSegmentsToConsider = factor - segmentsConsidered;
3371              }
3372              //feed the streams to the priority queue
3373              initialize(segmentsToMerge.size()); clear();
3374              for (int i = 0; i < segmentsToMerge.size(); i++) {
3375                put(segmentsToMerge.get(i));
3376              }
3377              //if we have lesser number of segments remaining, then just return the
3378              //iterator, else do another single level merge
3379              if (numSegments <= factor) {
3380                //calculate the length of the remaining segments. Required for 
3381                //calculating the merge progress
3382                long totalBytes = 0;
3383                for (int i = 0; i < segmentsToMerge.size(); i++) {
3384                  totalBytes += segmentsToMerge.get(i).segmentLength;
3385                }
3386                if (totalBytes != 0) //being paranoid
3387                  progPerByte = 1.0f / (float)totalBytes;
3388                //reset factor to what it originally was
3389                factor = origFactor;
3390                return this;
3391              } else {
3392                //we want to spread the creation of temp files on multiple disks if 
3393                //available under the space constraints
3394                long approxOutputSize = 0; 
3395                for (SegmentDescriptor s : segmentsToMerge) {
3396                  approxOutputSize += s.segmentLength + 
3397                                      ChecksumFileSystem.getApproxChkSumLength(
3398                                      s.segmentLength);
3399                }
3400                Path tmpFilename = 
3401                  new Path(tmpDir, "intermediate").suffix("." + passNo);
3402    
3403                Path outputFile =  lDirAlloc.getLocalPathForWrite(
3404                                                    tmpFilename.toString(),
3405                                                    approxOutputSize, conf);
3406                if(LOG.isDebugEnabled()) { 
3407                  LOG.debug("writing intermediate results to " + outputFile);
3408                }
3409                Writer writer = cloneFileAttributes(
3410                                                    fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
3411                                                    fs.makeQualified(outputFile), null);
3412                writer.sync = null; //disable sync for temp files
3413                writeFile(this, writer);
3414                writer.close();
3415                
3416                //we finished one single level merge; now clean up the priority 
3417                //queue
3418                this.close();
3419                
3420                SegmentDescriptor tempSegment = 
3421                  new SegmentDescriptor(0,
3422                      fs.getFileStatus(outputFile).getLen(), outputFile);
3423                //put the segment back in the TreeMap
3424                sortedSegmentSizes.put(tempSegment, null);
3425                numSegments = sortedSegmentSizes.size();
3426                passNo++;
3427              }
3428              //we are worried about only the first pass merge factor. So reset the 
3429              //factor to what it originally was
3430              factor = origFactor;
3431            } while(true);
3432          }
3433      
3434          //Hadoop-591
3435          public int getPassFactor(int passNo, int numSegments) {
3436            if (passNo > 1 || numSegments <= factor || factor == 1) 
3437              return factor;
3438            int mod = (numSegments - 1) % (factor - 1);
3439            if (mod == 0)
3440              return factor;
3441            return mod + 1;
3442          }
3443          
3444          /** Return (& remove) the requested number of segment descriptors from the
3445           * sorted map.
3446           */
3447          public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
3448            if (numDescriptors > sortedSegmentSizes.size())
3449              numDescriptors = sortedSegmentSizes.size();
3450            SegmentDescriptor[] SegmentDescriptors = 
3451              new SegmentDescriptor[numDescriptors];
3452            Iterator iter = sortedSegmentSizes.keySet().iterator();
3453            int i = 0;
3454            while (i < numDescriptors) {
3455              SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
3456              iter.remove();
3457            }
3458            return SegmentDescriptors;
3459          }
3460        } // SequenceFile.Sorter.MergeQueue
3461    
3462        /** This class defines a merge segment. This class can be subclassed to 
3463         * provide a customized cleanup method implementation. In this 
3464         * implementation, cleanup closes the file handle and deletes the file 
3465         */
3466        public class SegmentDescriptor implements Comparable {
3467          
3468          long segmentOffset; //the start of the segment in the file
3469          long segmentLength; //the length of the segment
3470          Path segmentPathName; //the path name of the file containing the segment
3471          boolean ignoreSync = true; //set to true for temp files
3472          private Reader in = null; 
3473          private DataOutputBuffer rawKey = null; //this will hold the current key
3474          private boolean preserveInput = false; //delete input segment files?
3475          
3476          /** Constructs a segment
3477           * @param segmentOffset the offset of the segment in the file
3478           * @param segmentLength the length of the segment
3479           * @param segmentPathName the path name of the file containing the segment
3480           */
3481          public SegmentDescriptor (long segmentOffset, long segmentLength, 
3482                                    Path segmentPathName) {
3483            this.segmentOffset = segmentOffset;
3484            this.segmentLength = segmentLength;
3485            this.segmentPathName = segmentPathName;
3486          }
3487          
3488          /** Do the sync checks */
3489          public void doSync() {ignoreSync = false;}
3490          
3491          /** Whether to delete the files when no longer needed */
3492          public void preserveInput(boolean preserve) {
3493            preserveInput = preserve;
3494          }
3495    
3496          public boolean shouldPreserveInput() {
3497            return preserveInput;
3498          }
3499          
3500          @Override
3501          public int compareTo(Object o) {
3502            SegmentDescriptor that = (SegmentDescriptor)o;
3503            if (this.segmentLength != that.segmentLength) {
3504              return (this.segmentLength < that.segmentLength ? -1 : 1);
3505            }
3506            if (this.segmentOffset != that.segmentOffset) {
3507              return (this.segmentOffset < that.segmentOffset ? -1 : 1);
3508            }
3509            return (this.segmentPathName.toString()).
3510              compareTo(that.segmentPathName.toString());
3511          }
3512    
3513          @Override
3514          public boolean equals(Object o) {
3515            if (!(o instanceof SegmentDescriptor)) {
3516              return false;
3517            }
3518            SegmentDescriptor that = (SegmentDescriptor)o;
3519            if (this.segmentLength == that.segmentLength &&
3520                this.segmentOffset == that.segmentOffset &&
3521                this.segmentPathName.toString().equals(
3522                  that.segmentPathName.toString())) {
3523              return true;
3524            }
3525            return false;
3526          }
3527    
3528          @Override
3529          public int hashCode() {
3530            return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
3531          }
3532    
3533          /** Fills up the rawKey object with the key returned by the Reader
3534           * @return true if there is a key returned; false, otherwise
3535           * @throws IOException
3536           */
3537          public boolean nextRawKey() throws IOException {
3538            if (in == null) {
3539              int bufferSize = getBufferSize(conf); 
3540              Reader reader = new Reader(conf,
3541                                         Reader.file(segmentPathName), 
3542                                         Reader.bufferSize(bufferSize),
3543                                         Reader.start(segmentOffset), 
3544                                         Reader.length(segmentLength));
3545            
3546              //sometimes we ignore syncs especially for temp merge files
3547              if (ignoreSync) reader.ignoreSync();
3548    
3549              if (reader.getKeyClass() != keyClass)
3550                throw new IOException("wrong key class: " + reader.getKeyClass() +
3551                                      " is not " + keyClass);
3552              if (reader.getValueClass() != valClass)
3553                throw new IOException("wrong value class: "+reader.getValueClass()+
3554                                      " is not " + valClass);
3555              this.in = reader;
3556              rawKey = new DataOutputBuffer();
3557            }
3558            rawKey.reset();
3559            int keyLength = 
3560              in.nextRawKey(rawKey);
3561            return (keyLength >= 0);
3562          }
3563    
3564          /** Fills up the passed rawValue with the value corresponding to the key
3565           * read earlier
3566           * @param rawValue
3567           * @return the length of the value
3568           * @throws IOException
3569           */
3570          public int nextRawValue(ValueBytes rawValue) throws IOException {
3571            int valLength = in.nextRawValue(rawValue);
3572            return valLength;
3573          }
3574          
3575          /** Returns the stored rawKey */
3576          public DataOutputBuffer getKey() {
3577            return rawKey;
3578          }
3579          
3580          /** closes the underlying reader */
3581          private void close() throws IOException {
3582            this.in.close();
3583            this.in = null;
3584          }
3585    
3586          /** The default cleanup. Subclasses can override this with a custom 
3587           * cleanup 
3588           */
3589          public void cleanup() throws IOException {
3590            close();
3591            if (!preserveInput) {
3592              fs.delete(segmentPathName, true);
3593            }
3594          }
3595        } // SequenceFile.Sorter.SegmentDescriptor
3596        
3597        /** This class provisions multiple segments contained within a single
3598         *  file
3599         */
3600        private class LinkedSegmentsDescriptor extends SegmentDescriptor {
3601    
3602          SegmentContainer parentContainer = null;
3603    
3604          /** Constructs a segment
3605           * @param segmentOffset the offset of the segment in the file
3606           * @param segmentLength the length of the segment
3607           * @param segmentPathName the path name of the file containing the segment
3608           * @param parent the parent SegmentContainer that holds the segment
3609           */
3610          public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
3611                                           Path segmentPathName, SegmentContainer parent) {
3612            super(segmentOffset, segmentLength, segmentPathName);
3613            this.parentContainer = parent;
3614          }
3615          /** The default cleanup. Subclasses can override this with a custom 
3616           * cleanup 
3617           */
3618          @Override
3619          public void cleanup() throws IOException {
3620            super.close();
3621            if (super.shouldPreserveInput()) return;
3622            parentContainer.cleanup();
3623          }
3624          
3625          @Override
3626          public boolean equals(Object o) {
3627            if (!(o instanceof LinkedSegmentsDescriptor)) {
3628              return false;
3629            }
3630            return super.equals(o);
3631          }
3632        } //SequenceFile.Sorter.LinkedSegmentsDescriptor
3633    
3634        /** The class that defines a container for segments to be merged. Primarily
3635         * required to delete temp files as soon as all the contained segments
3636         * have been looked at */
3637        private class SegmentContainer {
3638          private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
3639          private int numSegmentsContained; //# of segments contained
3640          private Path inName; //input file from where segments are created
3641          
3642          //the list of segments read from the file
3643          private ArrayList <SegmentDescriptor> segments = 
3644            new ArrayList <SegmentDescriptor>();
3645          /** This constructor is there primarily to serve the sort routine that 
3646           * generates a single output file with an associated index file */
3647          public SegmentContainer(Path inName, Path indexIn) throws IOException {
3648            //get the segments from indexIn
3649            FSDataInputStream fsIndexIn = fs.open(indexIn);
3650            long end = fs.getFileStatus(indexIn).getLen();
3651            while (fsIndexIn.getPos() < end) {
3652              long segmentOffset = WritableUtils.readVLong(fsIndexIn);
3653              long segmentLength = WritableUtils.readVLong(fsIndexIn);
3654              Path segmentName = inName;
3655              segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
3656                                                        segmentLength, segmentName, this));
3657            }
3658            fsIndexIn.close();
3659            fs.delete(indexIn, true);
3660            numSegmentsContained = segments.size();
3661            this.inName = inName;
3662          }
3663    
3664          public List <SegmentDescriptor> getSegmentList() {
3665            return segments;
3666          }
3667          public void cleanup() throws IOException {
3668            numSegmentsCleanedUp++;
3669            if (numSegmentsCleanedUp == numSegmentsContained) {
3670              fs.delete(inName, true);
3671            }
3672          }
3673        } //SequenceFile.Sorter.SegmentContainer
3674    
3675      } // SequenceFile.Sorter
3676    
3677    } // SequenceFile