001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.HadoopIllegalArgumentException;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.IOUtils;
035import org.apache.hadoop.io.SequenceFile.CompressionType;
036import org.apache.hadoop.io.compress.CompressionCodec;
037import org.apache.hadoop.util.Options;
038import org.apache.hadoop.util.Progressable;
039import org.apache.hadoop.util.ReflectionUtils;
040
041/** A file-based map from keys to values.
042 * 
043 * <p>A map is a directory containing two files, the <code>data</code> file,
044 * containing all keys and values in the map, and a smaller <code>index</code>
045 * file, containing a fraction of the keys.  The fraction is determined by
046 * {@link Writer#getIndexInterval()}.
047 *
048 * <p>The index file is read entirely into memory.  Thus key implementations
049 * should try to keep themselves small.
050 *
051 * <p>Map files are created by adding entries in-order.  To maintain a large
052 * database, perform updates by copying the previous version of a database and
053 * merging in a sorted change list, to create a new version of the database in
054 * a new file.  Sorting large change lists can be done with {@link
055 * SequenceFile.Sorter}.
056 */
057@InterfaceAudience.Public
058@InterfaceStability.Stable
059public class MapFile {
060  private static final Log LOG = LogFactory.getLog(MapFile.class);
061
062  /** The name of the index file. */
063  public static final String INDEX_FILE_NAME = "index";
064
065  /** The name of the data file. */
066  public static final String DATA_FILE_NAME = "data";
067
068  protected MapFile() {}                          // no public ctor
069
070  /** Writes a new map. */
071  public static class Writer implements java.io.Closeable {
072    private SequenceFile.Writer data;
073    private SequenceFile.Writer index;
074
075    final private static String INDEX_INTERVAL = "io.map.index.interval";
076    private int indexInterval = 128;
077
078    private long size;
079    private LongWritable position = new LongWritable();
080
081    // the following fields are used only for checking key order
082    private WritableComparator comparator;
083    private DataInputBuffer inBuf = new DataInputBuffer();
084    private DataOutputBuffer outBuf = new DataOutputBuffer();
085    private WritableComparable lastKey;
086
087    /** What's the position (in bytes) we wrote when we got the last index */
088    private long lastIndexPos = -1;
089
090    /**
091     * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
092     * we have an index at position zero -- midKey will throw an exception if this
093     * is not the case
094     */
095    private long lastIndexKeyCount = Long.MIN_VALUE;
096
097
098    /** Create the named map for keys of the named class. 
099     * @deprecated Use Writer(Configuration, Path, Option...) instead.
100     */
101    @Deprecated
102    public Writer(Configuration conf, FileSystem fs, String dirName,
103                  Class<? extends WritableComparable> keyClass, 
104                  Class valClass) throws IOException {
105      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
106    }
107
108    /** Create the named map for keys of the named class. 
109     * @deprecated Use Writer(Configuration, Path, Option...) instead.
110     */
111    @Deprecated
112    public Writer(Configuration conf, FileSystem fs, String dirName,
113                  Class<? extends WritableComparable> keyClass, Class valClass,
114                  CompressionType compress, 
115                  Progressable progress) throws IOException {
116      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
117           compression(compress), progressable(progress));
118    }
119
120    /** Create the named map for keys of the named class. 
121     * @deprecated Use Writer(Configuration, Path, Option...) instead.
122     */
123    @Deprecated
124    public Writer(Configuration conf, FileSystem fs, String dirName,
125                  Class<? extends WritableComparable> keyClass, Class valClass,
126                  CompressionType compress, CompressionCodec codec,
127                  Progressable progress) throws IOException {
128      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
129           compression(compress, codec), progressable(progress));
130    }
131
132    /** Create the named map for keys of the named class. 
133     * @deprecated Use Writer(Configuration, Path, Option...) instead.
134     */
135    @Deprecated
136    public Writer(Configuration conf, FileSystem fs, String dirName,
137                  Class<? extends WritableComparable> keyClass, Class valClass,
138                  CompressionType compress) throws IOException {
139      this(conf, new Path(dirName), keyClass(keyClass),
140           valueClass(valClass), compression(compress));
141    }
142
143    /** Create the named map using the named key comparator. 
144     * @deprecated Use Writer(Configuration, Path, Option...) instead.
145     */
146    @Deprecated
147    public Writer(Configuration conf, FileSystem fs, String dirName,
148                  WritableComparator comparator, Class valClass
149                  ) throws IOException {
150      this(conf, new Path(dirName), comparator(comparator), 
151           valueClass(valClass));
152    }
153
154    /** Create the named map using the named key comparator. 
155     * @deprecated Use Writer(Configuration, Path, Option...) instead.
156     */
157    @Deprecated
158    public Writer(Configuration conf, FileSystem fs, String dirName,
159                  WritableComparator comparator, Class valClass,
160                  SequenceFile.CompressionType compress) throws IOException {
161      this(conf, new Path(dirName), comparator(comparator),
162           valueClass(valClass), compression(compress));
163    }
164
165    /** Create the named map using the named key comparator. 
166     * @deprecated Use Writer(Configuration, Path, Option...)} instead.
167     */
168    @Deprecated
169    public Writer(Configuration conf, FileSystem fs, String dirName,
170                  WritableComparator comparator, Class valClass,
171                  SequenceFile.CompressionType compress,
172                  Progressable progress) throws IOException {
173      this(conf, new Path(dirName), comparator(comparator),
174           valueClass(valClass), compression(compress),
175           progressable(progress));
176    }
177
178    /** Create the named map using the named key comparator. 
179     * @deprecated Use Writer(Configuration, Path, Option...) instead.
180     */
181    @Deprecated
182    public Writer(Configuration conf, FileSystem fs, String dirName,
183                  WritableComparator comparator, Class valClass,
184                  SequenceFile.CompressionType compress, CompressionCodec codec,
185                  Progressable progress) throws IOException {
186      this(conf, new Path(dirName), comparator(comparator),
187           valueClass(valClass), compression(compress, codec),
188           progressable(progress));
189    }
190    
191    // our options are a superset of sequence file writer options
192    public static interface Option extends SequenceFile.Writer.Option { }
193    
194    private static class KeyClassOption extends Options.ClassOption
195                                        implements Option {
196      KeyClassOption(Class<?> value) {
197        super(value);
198      }
199    }
200    
201    private static class ComparatorOption implements Option {
202      private final WritableComparator value;
203      ComparatorOption(WritableComparator value) {
204        this.value = value;
205      }
206      WritableComparator getValue() {
207        return value;
208      }
209    }
210
211    public static Option keyClass(Class<? extends WritableComparable> value) {
212      return new KeyClassOption(value);
213    }
214    
215    public static Option comparator(WritableComparator value) {
216      return new ComparatorOption(value);
217    }
218
219    public static SequenceFile.Writer.Option valueClass(Class<?> value) {
220      return SequenceFile.Writer.valueClass(value);
221    }
222    
223    public static 
224    SequenceFile.Writer.Option compression(CompressionType type) {
225      return SequenceFile.Writer.compression(type);
226    }
227
228    public static 
229    SequenceFile.Writer.Option compression(CompressionType type,
230        CompressionCodec codec) {
231      return SequenceFile.Writer.compression(type, codec);
232    }
233
234    public static SequenceFile.Writer.Option progressable(Progressable value) {
235      return SequenceFile.Writer.progressable(value);
236    }
237
238    @SuppressWarnings("unchecked")
239    public Writer(Configuration conf, 
240                  Path dirName,
241                  SequenceFile.Writer.Option... opts
242                  ) throws IOException {
243      KeyClassOption keyClassOption = 
244        Options.getOption(KeyClassOption.class, opts);
245      ComparatorOption comparatorOption =
246        Options.getOption(ComparatorOption.class, opts);
247      if ((keyClassOption == null) == (comparatorOption == null)) {
248        throw new IllegalArgumentException("key class or comparator option "
249                                           + "must be set");
250      }
251      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
252
253      Class<? extends WritableComparable> keyClass;
254      if (keyClassOption == null) {
255        this.comparator = comparatorOption.getValue();
256        keyClass = comparator.getKeyClass();
257      } else {
258        keyClass= 
259          (Class<? extends WritableComparable>) keyClassOption.getValue();
260        this.comparator = WritableComparator.get(keyClass, conf);
261      }
262      this.lastKey = comparator.newKey();
263      FileSystem fs = dirName.getFileSystem(conf);
264
265      if (!fs.mkdirs(dirName)) {
266        throw new IOException("Mkdirs failed to create directory " + dirName);
267      }
268      Path dataFile = new Path(dirName, DATA_FILE_NAME);
269      Path indexFile = new Path(dirName, INDEX_FILE_NAME);
270
271      SequenceFile.Writer.Option[] dataOptions =
272        Options.prependOptions(opts, 
273                               SequenceFile.Writer.file(dataFile),
274                               SequenceFile.Writer.keyClass(keyClass));
275      this.data = SequenceFile.createWriter(conf, dataOptions);
276
277      SequenceFile.Writer.Option[] indexOptions =
278        Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
279            SequenceFile.Writer.keyClass(keyClass),
280            SequenceFile.Writer.valueClass(LongWritable.class),
281            SequenceFile.Writer.compression(CompressionType.BLOCK));
282      this.index = SequenceFile.createWriter(conf, indexOptions);      
283    }
284
285    /** The number of entries that are added before an index entry is added.*/
286    public int getIndexInterval() { return indexInterval; }
287
288    /** Sets the index interval.
289     * @see #getIndexInterval()
290     */
291    public void setIndexInterval(int interval) { indexInterval = interval; }
292
293    /** Sets the index interval and stores it in conf
294     * @see #getIndexInterval()
295     */
296    public static void setIndexInterval(Configuration conf, int interval) {
297      conf.setInt(INDEX_INTERVAL, interval);
298    }
299
300    /** Close the map. */
301    @Override
302    public synchronized void close() throws IOException {
303      data.close();
304      index.close();
305    }
306
307    /** Append a key/value pair to the map.  The key must be greater or equal
308     * to the previous key added to the map. */
309    public synchronized void append(WritableComparable key, Writable val)
310      throws IOException {
311
312      checkKey(key);
313
314      long pos = data.getLength();      
315      // Only write an index if we've changed positions. In a block compressed
316      // file, this means we write an entry at the start of each block      
317      if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
318        position.set(pos);                        // point to current eof
319        index.append(key, position);
320        lastIndexPos = pos;
321        lastIndexKeyCount = size;
322      }
323
324      data.append(key, val);                      // append key/value to data
325      size++;
326    }
327
328    private void checkKey(WritableComparable key) throws IOException {
329      // check that keys are well-ordered
330      if (size != 0 && comparator.compare(lastKey, key) > 0)
331        throw new IOException("key out of order: "+key+" after "+lastKey);
332          
333      // update lastKey with a copy of key by writing and reading
334      outBuf.reset();
335      key.write(outBuf);                          // write new key
336
337      inBuf.reset(outBuf.getData(), outBuf.getLength());
338      lastKey.readFields(inBuf);                  // read into lastKey
339    }
340
341  }
342  
343  /** Provide access to an existing map. */
344  public static class Reader implements java.io.Closeable {
345      
346    /** Number of index entries to skip between each entry.  Zero by default.
347     * Setting this to values larger than zero can facilitate opening large map
348     * files using less memory. */
349    private int INDEX_SKIP = 0;
350      
351    private WritableComparator comparator;
352
353    private WritableComparable nextKey;
354    private long seekPosition = -1;
355    private int seekIndex = -1;
356    private long firstPosition;
357
358    // the data, on disk
359    private SequenceFile.Reader data;
360    private SequenceFile.Reader index;
361
362    // whether the index Reader was closed
363    private boolean indexClosed = false;
364
365    // the index, in memory
366    private int count = -1;
367    private WritableComparable[] keys;
368    private long[] positions;
369
370    /** Returns the class of keys in this file. */
371    public Class<?> getKeyClass() { return data.getKeyClass(); }
372
373    /** Returns the class of values in this file. */
374    public Class<?> getValueClass() { return data.getValueClass(); }
375
376    public static interface Option extends SequenceFile.Reader.Option {}
377    
378    public static Option comparator(WritableComparator value) {
379      return new ComparatorOption(value);
380    }
381
382    static class ComparatorOption implements Option {
383      private final WritableComparator value;
384      ComparatorOption(WritableComparator value) {
385        this.value = value;
386      }
387      WritableComparator getValue() {
388        return value;
389      }
390    }
391
392    public Reader(Path dir, Configuration conf,
393                  SequenceFile.Reader.Option... opts) throws IOException {
394      ComparatorOption comparatorOption = 
395        Options.getOption(ComparatorOption.class, opts);
396      WritableComparator comparator =
397        comparatorOption == null ? null : comparatorOption.getValue();
398      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
399      open(dir, comparator, conf, opts);
400    }
401 
402    /** Construct a map reader for the named map.
403     * @deprecated
404     */
405    @Deprecated
406    public Reader(FileSystem fs, String dirName, 
407                  Configuration conf) throws IOException {
408      this(new Path(dirName), conf);
409    }
410
411    /** Construct a map reader for the named map using the named comparator.
412     * @deprecated
413     */
414    @Deprecated
415    public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
416                  Configuration conf) throws IOException {
417      this(new Path(dirName), conf, comparator(comparator));
418    }
419    
420    protected synchronized void open(Path dir,
421                                     WritableComparator comparator,
422                                     Configuration conf, 
423                                     SequenceFile.Reader.Option... options
424                                     ) throws IOException {
425      Path dataFile = new Path(dir, DATA_FILE_NAME);
426      Path indexFile = new Path(dir, INDEX_FILE_NAME);
427
428      // open the data
429      this.data = createDataFileReader(dataFile, conf, options);
430      this.firstPosition = data.getPosition();
431
432      if (comparator == null) {
433        Class<? extends WritableComparable> cls;
434        cls = data.getKeyClass().asSubclass(WritableComparable.class);
435        this.comparator = WritableComparator.get(cls, conf);
436      } else {
437        this.comparator = comparator;
438      }
439
440      // open the index
441      SequenceFile.Reader.Option[] indexOptions =
442        Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
443      this.index = new SequenceFile.Reader(conf, indexOptions);
444    }
445
446    /**
447     * Override this method to specialize the type of
448     * {@link SequenceFile.Reader} returned.
449     */
450    protected SequenceFile.Reader 
451      createDataFileReader(Path dataFile, Configuration conf,
452                           SequenceFile.Reader.Option... options
453                           ) throws IOException {
454      SequenceFile.Reader.Option[] newOptions =
455        Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
456      return new SequenceFile.Reader(conf, newOptions);
457    }
458
459    private void readIndex() throws IOException {
460      // read the index entirely into memory
461      if (this.keys != null)
462        return;
463      this.count = 0;
464      this.positions = new long[1024];
465
466      try {
467        int skip = INDEX_SKIP;
468        LongWritable position = new LongWritable();
469        WritableComparable lastKey = null;
470        long lastIndex = -1;
471        ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
472        while (true) {
473          WritableComparable k = comparator.newKey();
474
475          if (!index.next(k, position))
476            break;
477
478          // check order to make sure comparator is compatible
479          if (lastKey != null && comparator.compare(lastKey, k) > 0)
480            throw new IOException("key out of order: "+k+" after "+lastKey);
481          lastKey = k;
482          if (skip > 0) {
483            skip--;
484            continue;                             // skip this entry
485          } else {
486            skip = INDEX_SKIP;                    // reset skip
487          }
488
489          // don't read an index that is the same as the previous one. Block
490          // compressed map files used to do this (multiple entries would point
491          // at the same block)
492          if (position.get() == lastIndex)
493            continue;
494
495          if (count == positions.length) {
496            positions = Arrays.copyOf(positions, positions.length * 2);
497          }
498
499          keyBuilder.add(k);
500          positions[count] = position.get();
501          count++;
502        }
503
504        this.keys = keyBuilder.toArray(new WritableComparable[count]);
505        positions = Arrays.copyOf(positions, count);
506      } catch (EOFException e) {
507        LOG.warn("Unexpected EOF reading " + index +
508                              " at entry #" + count + ".  Ignoring.");
509      } finally {
510        indexClosed = true;
511        index.close();
512      }
513    }
514
515    /** Re-positions the reader before its first key. */
516    public synchronized void reset() throws IOException {
517      data.seek(firstPosition);
518    }
519
520    /** Get the key at approximately the middle of the file. Or null if the
521     *  file is empty. 
522     */
523    public synchronized WritableComparable midKey() throws IOException {
524
525      readIndex();
526      if (count == 0) {
527        return null;
528      }
529    
530      return keys[(count - 1) / 2];
531    }
532    
533    /** Reads the final key from the file.
534     *
535     * @param key key to read into
536     */
537    public synchronized void finalKey(WritableComparable key)
538      throws IOException {
539
540      long originalPosition = data.getPosition(); // save position
541      try {
542        readIndex();                              // make sure index is valid
543        if (count > 0) {
544          data.seek(positions[count-1]);          // skip to last indexed entry
545        } else {
546          reset();                                // start at the beginning
547        }
548        while (data.next(key)) {}                 // scan to eof
549
550      } finally {
551        data.seek(originalPosition);              // restore position
552      }
553    }
554
555    /** Positions the reader at the named key, or if none such exists, at the
556     * first entry after the named key.  Returns true iff the named key exists
557     * in this map.
558     */
559    public synchronized boolean seek(WritableComparable key) throws IOException {
560      return seekInternal(key) == 0;
561    }
562
563    /** 
564     * Positions the reader at the named key, or if none such exists, at the
565     * first entry after the named key.
566     *
567     * @return  0   - exact match found
568     *          < 0 - positioned at next record
569     *          1   - no more records in file
570     */
571    private synchronized int seekInternal(WritableComparable key)
572      throws IOException {
573      return seekInternal(key, false);
574    }
575
576    /** 
577     * Positions the reader at the named key, or if none such exists, at the
578     * key that falls just before or just after dependent on how the
579     * <code>before</code> parameter is set.
580     * 
581     * @param before - IF true, and <code>key</code> does not exist, position
582     * file at entry that falls just before <code>key</code>.  Otherwise,
583     * position file at record that sorts just after.
584     * @return  0   - exact match found
585     *          < 0 - positioned at next record
586     *          1   - no more records in file
587     */
588    private synchronized int seekInternal(WritableComparable key,
589        final boolean before)
590      throws IOException {
591      readIndex();                                // make sure index is read
592
593      if (seekIndex != -1                         // seeked before
594          && seekIndex+1 < count           
595          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
596          && comparator.compare(key, nextKey)
597          >= 0) {                                 // but after last seeked
598        // do nothing
599      } else {
600        seekIndex = binarySearch(key);
601        if (seekIndex < 0)                        // decode insertion point
602          seekIndex = -seekIndex-2;
603
604        if (seekIndex == -1)                      // belongs before first entry
605          seekPosition = firstPosition;           // use beginning of file
606        else
607          seekPosition = positions[seekIndex];    // else use index
608      }
609      data.seek(seekPosition);
610      
611      if (nextKey == null)
612        nextKey = comparator.newKey();
613     
614      // If we're looking for the key before, we need to keep track
615      // of the position we got the current key as well as the position
616      // of the key before it.
617      long prevPosition = -1;
618      long curPosition = seekPosition;
619
620      while (data.next(nextKey)) {
621        int c = comparator.compare(key, nextKey);
622        if (c <= 0) {                             // at or beyond desired
623          if (before && c != 0) {
624            if (prevPosition == -1) {
625              // We're on the first record of this index block
626              // and we've already passed the search key. Therefore
627              // we must be at the beginning of the file, so seek
628              // to the beginning of this block and return c
629              data.seek(curPosition);
630            } else {
631              // We have a previous record to back up to
632              data.seek(prevPosition);
633              data.next(nextKey);
634              // now that we've rewound, the search key must be greater than this key
635              return 1;
636            }
637          }
638          return c;
639        }
640        if (before) {
641          prevPosition = curPosition;
642          curPosition = data.getPosition();
643        }
644      }
645
646      return 1;
647    }
648
649    private int binarySearch(WritableComparable key) {
650      int low = 0;
651      int high = count-1;
652
653      while (low <= high) {
654        int mid = (low + high) >>> 1;
655        WritableComparable midVal = keys[mid];
656        int cmp = comparator.compare(midVal, key);
657
658        if (cmp < 0)
659          low = mid + 1;
660        else if (cmp > 0)
661          high = mid - 1;
662        else
663          return mid;                             // key found
664      }
665      return -(low + 1);                          // key not found.
666    }
667
668    /** Read the next key/value pair in the map into <code>key</code> and
669     * <code>val</code>.  Returns true if such a pair exists and false when at
670     * the end of the map */
671    public synchronized boolean next(WritableComparable key, Writable val)
672      throws IOException {
673      return data.next(key, val);
674    }
675
676    /** Return the value for the named key, or null if none exists. */
677    public synchronized Writable get(WritableComparable key, Writable val)
678      throws IOException {
679      if (seek(key)) {
680        data.getCurrentValue(val);
681        return val;
682      } else
683        return null;
684    }
685
686    /** 
687     * Finds the record that is the closest match to the specified key.
688     * Returns <code>key</code> or if it does not exist, at the first entry
689     * after the named key.
690     * 
691-     * @param key       - key that we're trying to find
692-     * @param val       - data value if key is found
693-     * @return          - the key that was the closest match or null if eof.
694     */
695    public synchronized WritableComparable getClosest(WritableComparable key,
696      Writable val)
697    throws IOException {
698      return getClosest(key, val, false);
699    }
700
701    /** 
702     * Finds the record that is the closest match to the specified key.
703     * 
704     * @param key       - key that we're trying to find
705     * @param val       - data value if key is found
706     * @param before    - IF true, and <code>key</code> does not exist, return
707     * the first entry that falls just before the <code>key</code>.  Otherwise,
708     * return the record that sorts just after.
709     * @return          - the key that was the closest match or null if eof.
710     */
711    public synchronized WritableComparable getClosest(WritableComparable key,
712        Writable val, final boolean before)
713      throws IOException {
714     
715      int c = seekInternal(key, before);
716
717      // If we didn't get an exact match, and we ended up in the wrong
718      // direction relative to the query key, return null since we
719      // must be at the beginning or end of the file.
720      if ((!before && c > 0) ||
721          (before && c < 0)) {
722        return null;
723      }
724
725      data.getCurrentValue(val);
726      return nextKey;
727    }
728
729    /** Close the map. */
730    @Override
731    public synchronized void close() throws IOException {
732      if (!indexClosed) {
733        index.close();
734      }
735      data.close();
736    }
737
738  }
739
740  /** Renames an existing map directory. */
741  public static void rename(FileSystem fs, String oldName, String newName)
742    throws IOException {
743    Path oldDir = new Path(oldName);
744    Path newDir = new Path(newName);
745    if (!fs.rename(oldDir, newDir)) {
746      throw new IOException("Could not rename " + oldDir + " to " + newDir);
747    }
748  }
749
750  /** Deletes the named map file. */
751  public static void delete(FileSystem fs, String name) throws IOException {
752    Path dir = new Path(name);
753    Path data = new Path(dir, DATA_FILE_NAME);
754    Path index = new Path(dir, INDEX_FILE_NAME);
755
756    fs.delete(data, true);
757    fs.delete(index, true);
758    fs.delete(dir, true);
759  }
760
761  /**
762   * This method attempts to fix a corrupt MapFile by re-creating its index.
763   * @param fs filesystem
764   * @param dir directory containing the MapFile data and index
765   * @param keyClass key class (has to be a subclass of Writable)
766   * @param valueClass value class (has to be a subclass of Writable)
767   * @param dryrun do not perform any changes, just report what needs to be done
768   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
769   * @throws Exception
770   */
771  public static long fix(FileSystem fs, Path dir,
772                         Class<? extends Writable> keyClass,
773                         Class<? extends Writable> valueClass, boolean dryrun,
774                         Configuration conf) throws Exception {
775    String dr = (dryrun ? "[DRY RUN ] " : "");
776    Path data = new Path(dir, DATA_FILE_NAME);
777    Path index = new Path(dir, INDEX_FILE_NAME);
778    int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
779    if (!fs.exists(data)) {
780      // there's nothing we can do to fix this!
781      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
782    }
783    if (fs.exists(index)) {
784      // no fixing needed
785      return -1;
786    }
787    SequenceFile.Reader dataReader = 
788      new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
789    if (!dataReader.getKeyClass().equals(keyClass)) {
790      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
791                          ", got " + dataReader.getKeyClass().getName());
792    }
793    if (!dataReader.getValueClass().equals(valueClass)) {
794      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
795                          ", got " + dataReader.getValueClass().getName());
796    }
797    long cnt = 0L;
798    Writable key = ReflectionUtils.newInstance(keyClass, conf);
799    Writable value = ReflectionUtils.newInstance(valueClass, conf);
800    SequenceFile.Writer indexWriter = null;
801    if (!dryrun) {
802      indexWriter = 
803        SequenceFile.createWriter(conf, 
804                                  SequenceFile.Writer.file(index), 
805                                  SequenceFile.Writer.keyClass(keyClass), 
806                                  SequenceFile.Writer.valueClass
807                                    (LongWritable.class));
808    }
809    try {
810      long pos = 0L;
811      LongWritable position = new LongWritable();
812      while(dataReader.next(key, value)) {
813        cnt++;
814        if (cnt % indexInterval == 0) {
815          position.set(pos);
816          if (!dryrun) indexWriter.append(key, position);
817        }
818        pos = dataReader.getPosition();
819      }
820    } catch(Throwable t) {
821      // truncated data file. swallow it.
822    }
823    dataReader.close();
824    if (!dryrun) indexWriter.close();
825    return cnt;
826  }
827
828  /**
829   * Class to merge multiple MapFiles of same Key and Value types to one MapFile
830   */
831  public static class Merger {
832    private Configuration conf;
833    private WritableComparator comparator = null;
834    private Reader[] inReaders;
835    private Writer outWriter;
836    private Class<Writable> valueClass = null;
837    private Class<WritableComparable> keyClass = null;
838
839    public Merger(Configuration conf) throws IOException {
840      this.conf = conf;
841    }
842
843    /**
844     * Merge multiple MapFiles to one Mapfile
845     *
846     * @param inMapFiles
847     * @param outMapFile
848     * @throws IOException
849     */
850    public void merge(Path[] inMapFiles, boolean deleteInputs,
851        Path outMapFile) throws IOException {
852      try {
853        open(inMapFiles, outMapFile);
854        mergePass();
855      } finally {
856        close();
857      }
858      if (deleteInputs) {
859        for (int i = 0; i < inMapFiles.length; i++) {
860          Path path = inMapFiles[i];
861          delete(path.getFileSystem(conf), path.toString());
862        }
863      }
864    }
865
866    /*
867     * Open all input files for reading and verify the key and value types. And
868     * open Output file for writing
869     */
870    @SuppressWarnings("unchecked")
871    private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
872      inReaders = new Reader[inMapFiles.length];
873      for (int i = 0; i < inMapFiles.length; i++) {
874        Reader reader = new Reader(inMapFiles[i], conf);
875        if (keyClass == null || valueClass == null) {
876          keyClass = (Class<WritableComparable>) reader.getKeyClass();
877          valueClass = (Class<Writable>) reader.getValueClass();
878        } else if (keyClass != reader.getKeyClass()
879            || valueClass != reader.getValueClass()) {
880          throw new HadoopIllegalArgumentException(
881              "Input files cannot be merged as they"
882                  + " have different Key and Value classes");
883        }
884        inReaders[i] = reader;
885      }
886
887      if (comparator == null) {
888        Class<? extends WritableComparable> cls;
889        cls = keyClass.asSubclass(WritableComparable.class);
890        this.comparator = WritableComparator.get(cls, conf);
891      } else if (comparator.getKeyClass() != keyClass) {
892        throw new HadoopIllegalArgumentException(
893            "Input files cannot be merged as they"
894                + " have different Key class compared to"
895                + " specified comparator");
896      }
897
898      outWriter = new MapFile.Writer(conf, outMapFile,
899          MapFile.Writer.keyClass(keyClass),
900          MapFile.Writer.valueClass(valueClass));
901    }
902
903    /**
904     * Merge all input files to output map file.<br>
905     * 1. Read first key/value from all input files to keys/values array. <br>
906     * 2. Select the least key and corresponding value. <br>
907     * 3. Write the selected key and value to output file. <br>
908     * 4. Replace the already written key/value in keys/values arrays with the
909     * next key/value from the selected input <br>
910     * 5. Repeat step 2-4 till all keys are read. <br>
911     */
912    private void mergePass() throws IOException {
913      // re-usable array
914      WritableComparable[] keys = new WritableComparable[inReaders.length];
915      Writable[] values = new Writable[inReaders.length];
916      // Read first key/value from all inputs
917      for (int i = 0; i < inReaders.length; i++) {
918        keys[i] = ReflectionUtils.newInstance(keyClass, null);
919        values[i] = ReflectionUtils.newInstance(valueClass, null);
920        if (!inReaders[i].next(keys[i], values[i])) {
921          // Handle empty files
922          keys[i] = null;
923          values[i] = null;
924        }
925      }
926
927      do {
928        int currentEntry = -1;
929        WritableComparable currentKey = null;
930        Writable currentValue = null;
931        for (int i = 0; i < keys.length; i++) {
932          if (keys[i] == null) {
933            // Skip Readers reached EOF
934            continue;
935          }
936          if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
937            currentEntry = i;
938            currentKey = keys[i];
939            currentValue = values[i];
940          }
941        }
942        if (currentKey == null) {
943          // Merge Complete
944          break;
945        }
946        // Write the selected key/value to merge stream
947        outWriter.append(currentKey, currentValue);
948        // Replace the already written key/value in keys/values arrays with the
949        // next key/value from the selected input
950        if (!inReaders[currentEntry].next(keys[currentEntry],
951            values[currentEntry])) {
952          // EOF for this file
953          keys[currentEntry] = null;
954          values[currentEntry] = null;
955        }
956      } while (true);
957    }
958
959    private void close() throws IOException {
960      for (int i = 0; i < inReaders.length; i++) {
961        IOUtils.closeStream(inReaders[i]);
962        inReaders[i] = null;
963      }
964      if (outWriter != null) {
965        outWriter.close();
966        outWriter = null;
967      }
968    }
969  }
970
971  public static void main(String[] args) throws Exception {
972    String usage = "Usage: MapFile inFile outFile";
973      
974    if (args.length != 2) {
975      System.err.println(usage);
976      System.exit(-1);
977    }
978      
979    String in = args[0];
980    String out = args[1];
981
982    Configuration conf = new Configuration();
983    FileSystem fs = FileSystem.getLocal(conf);
984    MapFile.Reader reader = null;
985    MapFile.Writer writer = null;
986    try {
987      reader = new MapFile.Reader(fs, in, conf);
988      writer =
989        new MapFile.Writer(conf, fs, out,
990            reader.getKeyClass().asSubclass(WritableComparable.class),
991            reader.getValueClass());
992
993      WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
994        .asSubclass(WritableComparable.class), conf);
995      Writable value = ReflectionUtils.newInstance(reader.getValueClass()
996        .asSubclass(Writable.class), conf);
997
998      while (reader.next(key, value))               // copy all entries
999        writer.append(key, value);
1000    } finally {
1001      IOUtils.cleanup(LOG, writer, reader);
1002    }
1003  }
1004}