001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.io.SequenceFile.CompressionType;
034import org.apache.hadoop.io.compress.CompressionCodec;
035import org.apache.hadoop.util.Options;
036import org.apache.hadoop.util.Progressable;
037import org.apache.hadoop.util.ReflectionUtils;
038
039/** A file-based map from keys to values.
040 * 
041 * <p>A map is a directory containing two files, the <code>data</code> file,
042 * containing all keys and values in the map, and a smaller <code>index</code>
043 * file, containing a fraction of the keys.  The fraction is determined by
044 * {@link Writer#getIndexInterval()}.
045 *
046 * <p>The index file is read entirely into memory.  Thus key implementations
047 * should try to keep themselves small.
048 *
049 * <p>Map files are created by adding entries in-order.  To maintain a large
050 * database, perform updates by copying the previous version of a database and
051 * merging in a sorted change list, to create a new version of the database in
052 * a new file.  Sorting large change lists can be done with {@link
053 * SequenceFile.Sorter}.
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Stable
057public class MapFile {
058  private static final Log LOG = LogFactory.getLog(MapFile.class);
059
060  /** The name of the index file. */
061  public static final String INDEX_FILE_NAME = "index";
062
063  /** The name of the data file. */
064  public static final String DATA_FILE_NAME = "data";
065
066  protected MapFile() {}                          // no public ctor
067
068  /** Writes a new map. */
069  public static class Writer implements java.io.Closeable {
070    private SequenceFile.Writer data;
071    private SequenceFile.Writer index;
072
073    final private static String INDEX_INTERVAL = "io.map.index.interval";
074    private int indexInterval = 128;
075
076    private long size;
077    private LongWritable position = new LongWritable();
078
079    // the following fields are used only for checking key order
080    private WritableComparator comparator;
081    private DataInputBuffer inBuf = new DataInputBuffer();
082    private DataOutputBuffer outBuf = new DataOutputBuffer();
083    private WritableComparable lastKey;
084
085    /** What's the position (in bytes) we wrote when we got the last index */
086    private long lastIndexPos = -1;
087
088    /**
089     * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
090     * we have an index at position zero -- midKey will throw an exception if this
091     * is not the case
092     */
093    private long lastIndexKeyCount = Long.MIN_VALUE;
094
095
096    /** Create the named map for keys of the named class. 
097     * @deprecated Use Writer(Configuration, Path, Option...) instead.
098     */
099    @Deprecated
100    public Writer(Configuration conf, FileSystem fs, String dirName,
101                  Class<? extends WritableComparable> keyClass, 
102                  Class valClass) throws IOException {
103      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
104    }
105
106    /** Create the named map for keys of the named class. 
107     * @deprecated Use Writer(Configuration, Path, Option...) instead.
108     */
109    @Deprecated
110    public Writer(Configuration conf, FileSystem fs, String dirName,
111                  Class<? extends WritableComparable> keyClass, Class valClass,
112                  CompressionType compress, 
113                  Progressable progress) throws IOException {
114      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
115           compression(compress), progressable(progress));
116    }
117
118    /** Create the named map for keys of the named class. 
119     * @deprecated Use Writer(Configuration, Path, Option...) instead.
120     */
121    @Deprecated
122    public Writer(Configuration conf, FileSystem fs, String dirName,
123                  Class<? extends WritableComparable> keyClass, Class valClass,
124                  CompressionType compress, CompressionCodec codec,
125                  Progressable progress) throws IOException {
126      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
127           compression(compress, codec), progressable(progress));
128    }
129
130    /** Create the named map for keys of the named class. 
131     * @deprecated Use Writer(Configuration, Path, Option...) instead.
132     */
133    @Deprecated
134    public Writer(Configuration conf, FileSystem fs, String dirName,
135                  Class<? extends WritableComparable> keyClass, Class valClass,
136                  CompressionType compress) throws IOException {
137      this(conf, new Path(dirName), keyClass(keyClass),
138           valueClass(valClass), compression(compress));
139    }
140
141    /** Create the named map using the named key comparator. 
142     * @deprecated Use Writer(Configuration, Path, Option...) instead.
143     */
144    @Deprecated
145    public Writer(Configuration conf, FileSystem fs, String dirName,
146                  WritableComparator comparator, Class valClass
147                  ) throws IOException {
148      this(conf, new Path(dirName), comparator(comparator), 
149           valueClass(valClass));
150    }
151
152    /** Create the named map using the named key comparator. 
153     * @deprecated Use Writer(Configuration, Path, Option...) instead.
154     */
155    @Deprecated
156    public Writer(Configuration conf, FileSystem fs, String dirName,
157                  WritableComparator comparator, Class valClass,
158                  SequenceFile.CompressionType compress) throws IOException {
159      this(conf, new Path(dirName), comparator(comparator),
160           valueClass(valClass), compression(compress));
161    }
162
163    /** Create the named map using the named key comparator. 
164     * @deprecated Use Writer(Configuration, Path, Option...)} instead.
165     */
166    @Deprecated
167    public Writer(Configuration conf, FileSystem fs, String dirName,
168                  WritableComparator comparator, Class valClass,
169                  SequenceFile.CompressionType compress,
170                  Progressable progress) throws IOException {
171      this(conf, new Path(dirName), comparator(comparator),
172           valueClass(valClass), compression(compress),
173           progressable(progress));
174    }
175
176    /** Create the named map using the named key comparator. 
177     * @deprecated Use Writer(Configuration, Path, Option...) instead.
178     */
179    @Deprecated
180    public Writer(Configuration conf, FileSystem fs, String dirName,
181                  WritableComparator comparator, Class valClass,
182                  SequenceFile.CompressionType compress, CompressionCodec codec,
183                  Progressable progress) throws IOException {
184      this(conf, new Path(dirName), comparator(comparator),
185           valueClass(valClass), compression(compress, codec),
186           progressable(progress));
187    }
188    
189    // our options are a superset of sequence file writer options
190    public static interface Option extends SequenceFile.Writer.Option { }
191    
192    private static class KeyClassOption extends Options.ClassOption
193                                        implements Option {
194      KeyClassOption(Class<?> value) {
195        super(value);
196      }
197    }
198    
199    private static class ComparatorOption implements Option {
200      private final WritableComparator value;
201      ComparatorOption(WritableComparator value) {
202        this.value = value;
203      }
204      WritableComparator getValue() {
205        return value;
206      }
207    }
208
209    public static Option keyClass(Class<? extends WritableComparable> value) {
210      return new KeyClassOption(value);
211    }
212    
213    public static Option comparator(WritableComparator value) {
214      return new ComparatorOption(value);
215    }
216
217    public static SequenceFile.Writer.Option valueClass(Class<?> value) {
218      return SequenceFile.Writer.valueClass(value);
219    }
220    
221    public static 
222    SequenceFile.Writer.Option compression(CompressionType type) {
223      return SequenceFile.Writer.compression(type);
224    }
225
226    public static 
227    SequenceFile.Writer.Option compression(CompressionType type,
228        CompressionCodec codec) {
229      return SequenceFile.Writer.compression(type, codec);
230    }
231
232    public static SequenceFile.Writer.Option progressable(Progressable value) {
233      return SequenceFile.Writer.progressable(value);
234    }
235
236    @SuppressWarnings("unchecked")
237    public Writer(Configuration conf, 
238                  Path dirName,
239                  SequenceFile.Writer.Option... opts
240                  ) throws IOException {
241      KeyClassOption keyClassOption = 
242        Options.getOption(KeyClassOption.class, opts);
243      ComparatorOption comparatorOption =
244        Options.getOption(ComparatorOption.class, opts);
245      if ((keyClassOption == null) == (comparatorOption == null)) {
246        throw new IllegalArgumentException("key class or comparator option "
247                                           + "must be set");
248      }
249      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
250
251      Class<? extends WritableComparable> keyClass;
252      if (keyClassOption == null) {
253        this.comparator = comparatorOption.getValue();
254        keyClass = comparator.getKeyClass();
255      } else {
256        keyClass= 
257          (Class<? extends WritableComparable>) keyClassOption.getValue();
258        this.comparator = WritableComparator.get(keyClass);
259      }
260      this.lastKey = comparator.newKey();
261      FileSystem fs = dirName.getFileSystem(conf);
262
263      if (!fs.mkdirs(dirName)) {
264        throw new IOException("Mkdirs failed to create directory " + dirName);
265      }
266      Path dataFile = new Path(dirName, DATA_FILE_NAME);
267      Path indexFile = new Path(dirName, INDEX_FILE_NAME);
268
269      SequenceFile.Writer.Option[] dataOptions =
270        Options.prependOptions(opts, 
271                               SequenceFile.Writer.file(dataFile),
272                               SequenceFile.Writer.keyClass(keyClass));
273      this.data = SequenceFile.createWriter(conf, dataOptions);
274
275      SequenceFile.Writer.Option[] indexOptions =
276        Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
277            SequenceFile.Writer.keyClass(keyClass),
278            SequenceFile.Writer.valueClass(LongWritable.class),
279            SequenceFile.Writer.compression(CompressionType.BLOCK));
280      this.index = SequenceFile.createWriter(conf, indexOptions);      
281    }
282
283    /** The number of entries that are added before an index entry is added.*/
284    public int getIndexInterval() { return indexInterval; }
285
286    /** Sets the index interval.
287     * @see #getIndexInterval()
288     */
289    public void setIndexInterval(int interval) { indexInterval = interval; }
290
291    /** Sets the index interval and stores it in conf
292     * @see #getIndexInterval()
293     */
294    public static void setIndexInterval(Configuration conf, int interval) {
295      conf.setInt(INDEX_INTERVAL, interval);
296    }
297
298    /** Close the map. */
299    public synchronized void close() throws IOException {
300      data.close();
301      index.close();
302    }
303
304    /** Append a key/value pair to the map.  The key must be greater or equal
305     * to the previous key added to the map. */
306    public synchronized void append(WritableComparable key, Writable val)
307      throws IOException {
308
309      checkKey(key);
310
311      long pos = data.getLength();      
312      // Only write an index if we've changed positions. In a block compressed
313      // file, this means we write an entry at the start of each block      
314      if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
315        position.set(pos);                        // point to current eof
316        index.append(key, position);
317        lastIndexPos = pos;
318        lastIndexKeyCount = size;
319      }
320
321      data.append(key, val);                      // append key/value to data
322      size++;
323    }
324
325    private void checkKey(WritableComparable key) throws IOException {
326      // check that keys are well-ordered
327      if (size != 0 && comparator.compare(lastKey, key) > 0)
328        throw new IOException("key out of order: "+key+" after "+lastKey);
329          
330      // update lastKey with a copy of key by writing and reading
331      outBuf.reset();
332      key.write(outBuf);                          // write new key
333
334      inBuf.reset(outBuf.getData(), outBuf.getLength());
335      lastKey.readFields(inBuf);                  // read into lastKey
336    }
337
338  }
339  
340  /** Provide access to an existing map. */
341  public static class Reader implements java.io.Closeable {
342      
343    /** Number of index entries to skip between each entry.  Zero by default.
344     * Setting this to values larger than zero can facilitate opening large map
345     * files using less memory. */
346    private int INDEX_SKIP = 0;
347      
348    private WritableComparator comparator;
349
350    private WritableComparable nextKey;
351    private long seekPosition = -1;
352    private int seekIndex = -1;
353    private long firstPosition;
354
355    // the data, on disk
356    private SequenceFile.Reader data;
357    private SequenceFile.Reader index;
358
359    // whether the index Reader was closed
360    private boolean indexClosed = false;
361
362    // the index, in memory
363    private int count = -1;
364    private WritableComparable[] keys;
365    private long[] positions;
366
367    /** Returns the class of keys in this file. */
368    public Class<?> getKeyClass() { return data.getKeyClass(); }
369
370    /** Returns the class of values in this file. */
371    public Class<?> getValueClass() { return data.getValueClass(); }
372
373    public static interface Option extends SequenceFile.Reader.Option {}
374    
375    public static Option comparator(WritableComparator value) {
376      return new ComparatorOption(value);
377    }
378
379    static class ComparatorOption implements Option {
380      private final WritableComparator value;
381      ComparatorOption(WritableComparator value) {
382        this.value = value;
383      }
384      WritableComparator getValue() {
385        return value;
386      }
387    }
388
389    public Reader(Path dir, Configuration conf,
390                  SequenceFile.Reader.Option... opts) throws IOException {
391      ComparatorOption comparatorOption = 
392        Options.getOption(ComparatorOption.class, opts);
393      WritableComparator comparator =
394        comparatorOption == null ? null : comparatorOption.getValue();
395      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
396      open(dir, comparator, conf, opts);
397    }
398 
399    /** Construct a map reader for the named map.
400     * @deprecated
401     */
402    @Deprecated
403    public Reader(FileSystem fs, String dirName, 
404                  Configuration conf) throws IOException {
405      this(new Path(dirName), conf);
406    }
407
408    /** Construct a map reader for the named map using the named comparator.
409     * @deprecated
410     */
411    @Deprecated
412    public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
413                  Configuration conf) throws IOException {
414      this(new Path(dirName), conf, comparator(comparator));
415    }
416    
417    protected synchronized void open(Path dir,
418                                     WritableComparator comparator,
419                                     Configuration conf, 
420                                     SequenceFile.Reader.Option... options
421                                     ) throws IOException {
422      Path dataFile = new Path(dir, DATA_FILE_NAME);
423      Path indexFile = new Path(dir, INDEX_FILE_NAME);
424
425      // open the data
426      this.data = createDataFileReader(dataFile, conf, options);
427      this.firstPosition = data.getPosition();
428
429      if (comparator == null)
430        this.comparator = 
431          WritableComparator.get(data.getKeyClass().
432                                   asSubclass(WritableComparable.class));
433      else
434        this.comparator = comparator;
435
436      // open the index
437      SequenceFile.Reader.Option[] indexOptions =
438        Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
439      this.index = new SequenceFile.Reader(conf, indexOptions);
440    }
441
442    /**
443     * Override this method to specialize the type of
444     * {@link SequenceFile.Reader} returned.
445     */
446    protected SequenceFile.Reader 
447      createDataFileReader(Path dataFile, Configuration conf,
448                           SequenceFile.Reader.Option... options
449                           ) throws IOException {
450      SequenceFile.Reader.Option[] newOptions =
451        Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
452      return new SequenceFile.Reader(conf, newOptions);
453    }
454
455    private void readIndex() throws IOException {
456      // read the index entirely into memory
457      if (this.keys != null)
458        return;
459      this.count = 0;
460      this.positions = new long[1024];
461
462      try {
463        int skip = INDEX_SKIP;
464        LongWritable position = new LongWritable();
465        WritableComparable lastKey = null;
466        long lastIndex = -1;
467        ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
468        while (true) {
469          WritableComparable k = comparator.newKey();
470
471          if (!index.next(k, position))
472            break;
473
474          // check order to make sure comparator is compatible
475          if (lastKey != null && comparator.compare(lastKey, k) > 0)
476            throw new IOException("key out of order: "+k+" after "+lastKey);
477          lastKey = k;
478          if (skip > 0) {
479            skip--;
480            continue;                             // skip this entry
481          } else {
482            skip = INDEX_SKIP;                    // reset skip
483          }
484
485          // don't read an index that is the same as the previous one. Block
486          // compressed map files used to do this (multiple entries would point
487          // at the same block)
488          if (position.get() == lastIndex)
489            continue;
490
491          if (count == positions.length) {
492            positions = Arrays.copyOf(positions, positions.length * 2);
493          }
494
495          keyBuilder.add(k);
496          positions[count] = position.get();
497          count++;
498        }
499
500        this.keys = keyBuilder.toArray(new WritableComparable[count]);
501        positions = Arrays.copyOf(positions, count);
502      } catch (EOFException e) {
503        LOG.warn("Unexpected EOF reading " + index +
504                              " at entry #" + count + ".  Ignoring.");
505      } finally {
506        indexClosed = true;
507        index.close();
508      }
509    }
510
511    /** Re-positions the reader before its first key. */
512    public synchronized void reset() throws IOException {
513      data.seek(firstPosition);
514    }
515
516    /** Get the key at approximately the middle of the file. Or null if the
517     *  file is empty. 
518     */
519    public synchronized WritableComparable midKey() throws IOException {
520
521      readIndex();
522      if (count == 0) {
523        return null;
524      }
525    
526      return keys[(count - 1) / 2];
527    }
528    
529    /** Reads the final key from the file.
530     *
531     * @param key key to read into
532     */
533    public synchronized void finalKey(WritableComparable key)
534      throws IOException {
535
536      long originalPosition = data.getPosition(); // save position
537      try {
538        readIndex();                              // make sure index is valid
539        if (count > 0) {
540          data.seek(positions[count-1]);          // skip to last indexed entry
541        } else {
542          reset();                                // start at the beginning
543        }
544        while (data.next(key)) {}                 // scan to eof
545
546      } finally {
547        data.seek(originalPosition);              // restore position
548      }
549    }
550
551    /** Positions the reader at the named key, or if none such exists, at the
552     * first entry after the named key.  Returns true iff the named key exists
553     * in this map.
554     */
555    public synchronized boolean seek(WritableComparable key) throws IOException {
556      return seekInternal(key) == 0;
557    }
558
559    /** 
560     * Positions the reader at the named key, or if none such exists, at the
561     * first entry after the named key.
562     *
563     * @return  0   - exact match found
564     *          < 0 - positioned at next record
565     *          1   - no more records in file
566     */
567    private synchronized int seekInternal(WritableComparable key)
568      throws IOException {
569      return seekInternal(key, false);
570    }
571
572    /** 
573     * Positions the reader at the named key, or if none such exists, at the
574     * key that falls just before or just after dependent on how the
575     * <code>before</code> parameter is set.
576     * 
577     * @param before - IF true, and <code>key</code> does not exist, position
578     * file at entry that falls just before <code>key</code>.  Otherwise,
579     * position file at record that sorts just after.
580     * @return  0   - exact match found
581     *          < 0 - positioned at next record
582     *          1   - no more records in file
583     */
584    private synchronized int seekInternal(WritableComparable key,
585        final boolean before)
586      throws IOException {
587      readIndex();                                // make sure index is read
588
589      if (seekIndex != -1                         // seeked before
590          && seekIndex+1 < count           
591          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
592          && comparator.compare(key, nextKey)
593          >= 0) {                                 // but after last seeked
594        // do nothing
595      } else {
596        seekIndex = binarySearch(key);
597        if (seekIndex < 0)                        // decode insertion point
598          seekIndex = -seekIndex-2;
599
600        if (seekIndex == -1)                      // belongs before first entry
601          seekPosition = firstPosition;           // use beginning of file
602        else
603          seekPosition = positions[seekIndex];    // else use index
604      }
605      data.seek(seekPosition);
606      
607      if (nextKey == null)
608        nextKey = comparator.newKey();
609     
610      // If we're looking for the key before, we need to keep track
611      // of the position we got the current key as well as the position
612      // of the key before it.
613      long prevPosition = -1;
614      long curPosition = seekPosition;
615
616      while (data.next(nextKey)) {
617        int c = comparator.compare(key, nextKey);
618        if (c <= 0) {                             // at or beyond desired
619          if (before && c != 0) {
620            if (prevPosition == -1) {
621              // We're on the first record of this index block
622              // and we've already passed the search key. Therefore
623              // we must be at the beginning of the file, so seek
624              // to the beginning of this block and return c
625              data.seek(curPosition);
626            } else {
627              // We have a previous record to back up to
628              data.seek(prevPosition);
629              data.next(nextKey);
630              // now that we've rewound, the search key must be greater than this key
631              return 1;
632            }
633          }
634          return c;
635        }
636        if (before) {
637          prevPosition = curPosition;
638          curPosition = data.getPosition();
639        }
640      }
641
642      return 1;
643    }
644
645    private int binarySearch(WritableComparable key) {
646      int low = 0;
647      int high = count-1;
648
649      while (low <= high) {
650        int mid = (low + high) >>> 1;
651        WritableComparable midVal = keys[mid];
652        int cmp = comparator.compare(midVal, key);
653
654        if (cmp < 0)
655          low = mid + 1;
656        else if (cmp > 0)
657          high = mid - 1;
658        else
659          return mid;                             // key found
660      }
661      return -(low + 1);                          // key not found.
662    }
663
664    /** Read the next key/value pair in the map into <code>key</code> and
665     * <code>val</code>.  Returns true if such a pair exists and false when at
666     * the end of the map */
667    public synchronized boolean next(WritableComparable key, Writable val)
668      throws IOException {
669      return data.next(key, val);
670    }
671
672    /** Return the value for the named key, or null if none exists. */
673    public synchronized Writable get(WritableComparable key, Writable val)
674      throws IOException {
675      if (seek(key)) {
676        data.getCurrentValue(val);
677        return val;
678      } else
679        return null;
680    }
681
682    /** 
683     * Finds the record that is the closest match to the specified key.
684     * Returns <code>key</code> or if it does not exist, at the first entry
685     * after the named key.
686     * 
687-     * @param key       - key that we're trying to find
688-     * @param val       - data value if key is found
689-     * @return          - the key that was the closest match or null if eof.
690     */
691    public synchronized WritableComparable getClosest(WritableComparable key,
692      Writable val)
693    throws IOException {
694      return getClosest(key, val, false);
695    }
696
697    /** 
698     * Finds the record that is the closest match to the specified key.
699     * 
700     * @param key       - key that we're trying to find
701     * @param val       - data value if key is found
702     * @param before    - IF true, and <code>key</code> does not exist, return
703     * the first entry that falls just before the <code>key</code>.  Otherwise,
704     * return the record that sorts just after.
705     * @return          - the key that was the closest match or null if eof.
706     */
707    public synchronized WritableComparable getClosest(WritableComparable key,
708        Writable val, final boolean before)
709      throws IOException {
710     
711      int c = seekInternal(key, before);
712
713      // If we didn't get an exact match, and we ended up in the wrong
714      // direction relative to the query key, return null since we
715      // must be at the beginning or end of the file.
716      if ((!before && c > 0) ||
717          (before && c < 0)) {
718        return null;
719      }
720
721      data.getCurrentValue(val);
722      return nextKey;
723    }
724
725    /** Close the map. */
726    public synchronized void close() throws IOException {
727      if (!indexClosed) {
728        index.close();
729      }
730      data.close();
731    }
732
733  }
734
735  /** Renames an existing map directory. */
736  public static void rename(FileSystem fs, String oldName, String newName)
737    throws IOException {
738    Path oldDir = new Path(oldName);
739    Path newDir = new Path(newName);
740    if (!fs.rename(oldDir, newDir)) {
741      throw new IOException("Could not rename " + oldDir + " to " + newDir);
742    }
743  }
744
745  /** Deletes the named map file. */
746  public static void delete(FileSystem fs, String name) throws IOException {
747    Path dir = new Path(name);
748    Path data = new Path(dir, DATA_FILE_NAME);
749    Path index = new Path(dir, INDEX_FILE_NAME);
750
751    fs.delete(data, true);
752    fs.delete(index, true);
753    fs.delete(dir, true);
754  }
755
756  /**
757   * This method attempts to fix a corrupt MapFile by re-creating its index.
758   * @param fs filesystem
759   * @param dir directory containing the MapFile data and index
760   * @param keyClass key class (has to be a subclass of Writable)
761   * @param valueClass value class (has to be a subclass of Writable)
762   * @param dryrun do not perform any changes, just report what needs to be done
763   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
764   * @throws Exception
765   */
766  public static long fix(FileSystem fs, Path dir,
767                         Class<? extends Writable> keyClass,
768                         Class<? extends Writable> valueClass, boolean dryrun,
769                         Configuration conf) throws Exception {
770    String dr = (dryrun ? "[DRY RUN ] " : "");
771    Path data = new Path(dir, DATA_FILE_NAME);
772    Path index = new Path(dir, INDEX_FILE_NAME);
773    int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
774    if (!fs.exists(data)) {
775      // there's nothing we can do to fix this!
776      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
777    }
778    if (fs.exists(index)) {
779      // no fixing needed
780      return -1;
781    }
782    SequenceFile.Reader dataReader = 
783      new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
784    if (!dataReader.getKeyClass().equals(keyClass)) {
785      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
786                          ", got " + dataReader.getKeyClass().getName());
787    }
788    if (!dataReader.getValueClass().equals(valueClass)) {
789      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
790                          ", got " + dataReader.getValueClass().getName());
791    }
792    long cnt = 0L;
793    Writable key = ReflectionUtils.newInstance(keyClass, conf);
794    Writable value = ReflectionUtils.newInstance(valueClass, conf);
795    SequenceFile.Writer indexWriter = null;
796    if (!dryrun) {
797      indexWriter = 
798        SequenceFile.createWriter(conf, 
799                                  SequenceFile.Writer.file(index), 
800                                  SequenceFile.Writer.keyClass(keyClass), 
801                                  SequenceFile.Writer.valueClass
802                                    (LongWritable.class));
803    }
804    try {
805      long pos = 0L;
806      LongWritable position = new LongWritable();
807      while(dataReader.next(key, value)) {
808        cnt++;
809        if (cnt % indexInterval == 0) {
810          position.set(pos);
811          if (!dryrun) indexWriter.append(key, position);
812        }
813        pos = dataReader.getPosition();
814      }
815    } catch(Throwable t) {
816      // truncated data file. swallow it.
817    }
818    dataReader.close();
819    if (!dryrun) indexWriter.close();
820    return cnt;
821  }
822
823
824  public static void main(String[] args) throws Exception {
825    String usage = "Usage: MapFile inFile outFile";
826      
827    if (args.length != 2) {
828      System.err.println(usage);
829      System.exit(-1);
830    }
831      
832    String in = args[0];
833    String out = args[1];
834
835    Configuration conf = new Configuration();
836    FileSystem fs = FileSystem.getLocal(conf);
837    MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
838    MapFile.Writer writer =
839      new MapFile.Writer(conf, fs, out,
840          reader.getKeyClass().asSubclass(WritableComparable.class),
841          reader.getValueClass());
842
843    WritableComparable key =
844      ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
845    Writable value =
846      ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
847
848    while (reader.next(key, value))               // copy all entries
849      writer.append(key, value);
850
851    writer.close();
852  }
853
854}