001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.io.IOUtils;
034import org.apache.hadoop.io.SequenceFile.CompressionType;
035import org.apache.hadoop.io.compress.CompressionCodec;
036import org.apache.hadoop.util.Options;
037import org.apache.hadoop.util.Progressable;
038import org.apache.hadoop.util.ReflectionUtils;
039
040/** A file-based map from keys to values.
041 * 
042 * <p>A map is a directory containing two files, the <code>data</code> file,
043 * containing all keys and values in the map, and a smaller <code>index</code>
044 * file, containing a fraction of the keys.  The fraction is determined by
045 * {@link Writer#getIndexInterval()}.
046 *
047 * <p>The index file is read entirely into memory.  Thus key implementations
048 * should try to keep themselves small.
049 *
050 * <p>Map files are created by adding entries in-order.  To maintain a large
051 * database, perform updates by copying the previous version of a database and
052 * merging in a sorted change list, to create a new version of the database in
053 * a new file.  Sorting large change lists can be done with {@link
054 * SequenceFile.Sorter}.
055 */
056@InterfaceAudience.Public
057@InterfaceStability.Stable
058public class MapFile {
059  private static final Log LOG = LogFactory.getLog(MapFile.class);
060
061  /** The name of the index file. */
062  public static final String INDEX_FILE_NAME = "index";
063
064  /** The name of the data file. */
065  public static final String DATA_FILE_NAME = "data";
066
067  protected MapFile() {}                          // no public ctor
068
069  /** Writes a new map. */
070  public static class Writer implements java.io.Closeable {
071    private SequenceFile.Writer data;
072    private SequenceFile.Writer index;
073
074    final private static String INDEX_INTERVAL = "io.map.index.interval";
075    private int indexInterval = 128;
076
077    private long size;
078    private LongWritable position = new LongWritable();
079
080    // the following fields are used only for checking key order
081    private WritableComparator comparator;
082    private DataInputBuffer inBuf = new DataInputBuffer();
083    private DataOutputBuffer outBuf = new DataOutputBuffer();
084    private WritableComparable lastKey;
085
086    /** What's the position (in bytes) we wrote when we got the last index */
087    private long lastIndexPos = -1;
088
089    /**
090     * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
091     * we have an index at position zero -- midKey will throw an exception if this
092     * is not the case
093     */
094    private long lastIndexKeyCount = Long.MIN_VALUE;
095
096
097    /** Create the named map for keys of the named class. 
098     * @deprecated Use Writer(Configuration, Path, Option...) instead.
099     */
100    @Deprecated
101    public Writer(Configuration conf, FileSystem fs, String dirName,
102                  Class<? extends WritableComparable> keyClass, 
103                  Class valClass) throws IOException {
104      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
105    }
106
107    /** Create the named map for keys of the named class. 
108     * @deprecated Use Writer(Configuration, Path, Option...) instead.
109     */
110    @Deprecated
111    public Writer(Configuration conf, FileSystem fs, String dirName,
112                  Class<? extends WritableComparable> keyClass, Class valClass,
113                  CompressionType compress, 
114                  Progressable progress) throws IOException {
115      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
116           compression(compress), progressable(progress));
117    }
118
119    /** Create the named map for keys of the named class. 
120     * @deprecated Use Writer(Configuration, Path, Option...) instead.
121     */
122    @Deprecated
123    public Writer(Configuration conf, FileSystem fs, String dirName,
124                  Class<? extends WritableComparable> keyClass, Class valClass,
125                  CompressionType compress, CompressionCodec codec,
126                  Progressable progress) throws IOException {
127      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
128           compression(compress, codec), progressable(progress));
129    }
130
131    /** Create the named map for keys of the named class. 
132     * @deprecated Use Writer(Configuration, Path, Option...) instead.
133     */
134    @Deprecated
135    public Writer(Configuration conf, FileSystem fs, String dirName,
136                  Class<? extends WritableComparable> keyClass, Class valClass,
137                  CompressionType compress) throws IOException {
138      this(conf, new Path(dirName), keyClass(keyClass),
139           valueClass(valClass), compression(compress));
140    }
141
142    /** Create the named map using the named key comparator. 
143     * @deprecated Use Writer(Configuration, Path, Option...) instead.
144     */
145    @Deprecated
146    public Writer(Configuration conf, FileSystem fs, String dirName,
147                  WritableComparator comparator, Class valClass
148                  ) throws IOException {
149      this(conf, new Path(dirName), comparator(comparator), 
150           valueClass(valClass));
151    }
152
153    /** Create the named map using the named key comparator. 
154     * @deprecated Use Writer(Configuration, Path, Option...) instead.
155     */
156    @Deprecated
157    public Writer(Configuration conf, FileSystem fs, String dirName,
158                  WritableComparator comparator, Class valClass,
159                  SequenceFile.CompressionType compress) throws IOException {
160      this(conf, new Path(dirName), comparator(comparator),
161           valueClass(valClass), compression(compress));
162    }
163
164    /** Create the named map using the named key comparator. 
165     * @deprecated Use Writer(Configuration, Path, Option...)} instead.
166     */
167    @Deprecated
168    public Writer(Configuration conf, FileSystem fs, String dirName,
169                  WritableComparator comparator, Class valClass,
170                  SequenceFile.CompressionType compress,
171                  Progressable progress) throws IOException {
172      this(conf, new Path(dirName), comparator(comparator),
173           valueClass(valClass), compression(compress),
174           progressable(progress));
175    }
176
177    /** Create the named map using the named key comparator. 
178     * @deprecated Use Writer(Configuration, Path, Option...) instead.
179     */
180    @Deprecated
181    public Writer(Configuration conf, FileSystem fs, String dirName,
182                  WritableComparator comparator, Class valClass,
183                  SequenceFile.CompressionType compress, CompressionCodec codec,
184                  Progressable progress) throws IOException {
185      this(conf, new Path(dirName), comparator(comparator),
186           valueClass(valClass), compression(compress, codec),
187           progressable(progress));
188    }
189    
190    // our options are a superset of sequence file writer options
191    public static interface Option extends SequenceFile.Writer.Option { }
192    
193    private static class KeyClassOption extends Options.ClassOption
194                                        implements Option {
195      KeyClassOption(Class<?> value) {
196        super(value);
197      }
198    }
199    
200    private static class ComparatorOption implements Option {
201      private final WritableComparator value;
202      ComparatorOption(WritableComparator value) {
203        this.value = value;
204      }
205      WritableComparator getValue() {
206        return value;
207      }
208    }
209
210    public static Option keyClass(Class<? extends WritableComparable> value) {
211      return new KeyClassOption(value);
212    }
213    
214    public static Option comparator(WritableComparator value) {
215      return new ComparatorOption(value);
216    }
217
218    public static SequenceFile.Writer.Option valueClass(Class<?> value) {
219      return SequenceFile.Writer.valueClass(value);
220    }
221    
222    public static 
223    SequenceFile.Writer.Option compression(CompressionType type) {
224      return SequenceFile.Writer.compression(type);
225    }
226
227    public static 
228    SequenceFile.Writer.Option compression(CompressionType type,
229        CompressionCodec codec) {
230      return SequenceFile.Writer.compression(type, codec);
231    }
232
233    public static SequenceFile.Writer.Option progressable(Progressable value) {
234      return SequenceFile.Writer.progressable(value);
235    }
236
237    @SuppressWarnings("unchecked")
238    public Writer(Configuration conf, 
239                  Path dirName,
240                  SequenceFile.Writer.Option... opts
241                  ) throws IOException {
242      KeyClassOption keyClassOption = 
243        Options.getOption(KeyClassOption.class, opts);
244      ComparatorOption comparatorOption =
245        Options.getOption(ComparatorOption.class, opts);
246      if ((keyClassOption == null) == (comparatorOption == null)) {
247        throw new IllegalArgumentException("key class or comparator option "
248                                           + "must be set");
249      }
250      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
251
252      Class<? extends WritableComparable> keyClass;
253      if (keyClassOption == null) {
254        this.comparator = comparatorOption.getValue();
255        keyClass = comparator.getKeyClass();
256      } else {
257        keyClass= 
258          (Class<? extends WritableComparable>) keyClassOption.getValue();
259        this.comparator = WritableComparator.get(keyClass, conf);
260      }
261      this.lastKey = comparator.newKey();
262      FileSystem fs = dirName.getFileSystem(conf);
263
264      if (!fs.mkdirs(dirName)) {
265        throw new IOException("Mkdirs failed to create directory " + dirName);
266      }
267      Path dataFile = new Path(dirName, DATA_FILE_NAME);
268      Path indexFile = new Path(dirName, INDEX_FILE_NAME);
269
270      SequenceFile.Writer.Option[] dataOptions =
271        Options.prependOptions(opts, 
272                               SequenceFile.Writer.file(dataFile),
273                               SequenceFile.Writer.keyClass(keyClass));
274      this.data = SequenceFile.createWriter(conf, dataOptions);
275
276      SequenceFile.Writer.Option[] indexOptions =
277        Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
278            SequenceFile.Writer.keyClass(keyClass),
279            SequenceFile.Writer.valueClass(LongWritable.class),
280            SequenceFile.Writer.compression(CompressionType.BLOCK));
281      this.index = SequenceFile.createWriter(conf, indexOptions);      
282    }
283
284    /** The number of entries that are added before an index entry is added.*/
285    public int getIndexInterval() { return indexInterval; }
286
287    /** Sets the index interval.
288     * @see #getIndexInterval()
289     */
290    public void setIndexInterval(int interval) { indexInterval = interval; }
291
292    /** Sets the index interval and stores it in conf
293     * @see #getIndexInterval()
294     */
295    public static void setIndexInterval(Configuration conf, int interval) {
296      conf.setInt(INDEX_INTERVAL, interval);
297    }
298
299    /** Close the map. */
300    @Override
301    public synchronized void close() throws IOException {
302      data.close();
303      index.close();
304    }
305
306    /** Append a key/value pair to the map.  The key must be greater or equal
307     * to the previous key added to the map. */
308    public synchronized void append(WritableComparable key, Writable val)
309      throws IOException {
310
311      checkKey(key);
312
313      long pos = data.getLength();      
314      // Only write an index if we've changed positions. In a block compressed
315      // file, this means we write an entry at the start of each block      
316      if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
317        position.set(pos);                        // point to current eof
318        index.append(key, position);
319        lastIndexPos = pos;
320        lastIndexKeyCount = size;
321      }
322
323      data.append(key, val);                      // append key/value to data
324      size++;
325    }
326
327    private void checkKey(WritableComparable key) throws IOException {
328      // check that keys are well-ordered
329      if (size != 0 && comparator.compare(lastKey, key) > 0)
330        throw new IOException("key out of order: "+key+" after "+lastKey);
331          
332      // update lastKey with a copy of key by writing and reading
333      outBuf.reset();
334      key.write(outBuf);                          // write new key
335
336      inBuf.reset(outBuf.getData(), outBuf.getLength());
337      lastKey.readFields(inBuf);                  // read into lastKey
338    }
339
340  }
341  
342  /** Provide access to an existing map. */
343  public static class Reader implements java.io.Closeable {
344      
345    /** Number of index entries to skip between each entry.  Zero by default.
346     * Setting this to values larger than zero can facilitate opening large map
347     * files using less memory. */
348    private int INDEX_SKIP = 0;
349      
350    private WritableComparator comparator;
351
352    private WritableComparable nextKey;
353    private long seekPosition = -1;
354    private int seekIndex = -1;
355    private long firstPosition;
356
357    // the data, on disk
358    private SequenceFile.Reader data;
359    private SequenceFile.Reader index;
360
361    // whether the index Reader was closed
362    private boolean indexClosed = false;
363
364    // the index, in memory
365    private int count = -1;
366    private WritableComparable[] keys;
367    private long[] positions;
368
369    /** Returns the class of keys in this file. */
370    public Class<?> getKeyClass() { return data.getKeyClass(); }
371
372    /** Returns the class of values in this file. */
373    public Class<?> getValueClass() { return data.getValueClass(); }
374
375    public static interface Option extends SequenceFile.Reader.Option {}
376    
377    public static Option comparator(WritableComparator value) {
378      return new ComparatorOption(value);
379    }
380
381    static class ComparatorOption implements Option {
382      private final WritableComparator value;
383      ComparatorOption(WritableComparator value) {
384        this.value = value;
385      }
386      WritableComparator getValue() {
387        return value;
388      }
389    }
390
391    public Reader(Path dir, Configuration conf,
392                  SequenceFile.Reader.Option... opts) throws IOException {
393      ComparatorOption comparatorOption = 
394        Options.getOption(ComparatorOption.class, opts);
395      WritableComparator comparator =
396        comparatorOption == null ? null : comparatorOption.getValue();
397      INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
398      open(dir, comparator, conf, opts);
399    }
400 
401    /** Construct a map reader for the named map.
402     * @deprecated
403     */
404    @Deprecated
405    public Reader(FileSystem fs, String dirName, 
406                  Configuration conf) throws IOException {
407      this(new Path(dirName), conf);
408    }
409
410    /** Construct a map reader for the named map using the named comparator.
411     * @deprecated
412     */
413    @Deprecated
414    public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
415                  Configuration conf) throws IOException {
416      this(new Path(dirName), conf, comparator(comparator));
417    }
418    
419    protected synchronized void open(Path dir,
420                                     WritableComparator comparator,
421                                     Configuration conf, 
422                                     SequenceFile.Reader.Option... options
423                                     ) throws IOException {
424      Path dataFile = new Path(dir, DATA_FILE_NAME);
425      Path indexFile = new Path(dir, INDEX_FILE_NAME);
426
427      // open the data
428      this.data = createDataFileReader(dataFile, conf, options);
429      this.firstPosition = data.getPosition();
430
431      if (comparator == null) {
432        Class<? extends WritableComparable> cls;
433        cls = data.getKeyClass().asSubclass(WritableComparable.class);
434        this.comparator = WritableComparator.get(cls, conf);
435      } else {
436        this.comparator = comparator;
437      }
438
439      // open the index
440      SequenceFile.Reader.Option[] indexOptions =
441        Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
442      this.index = new SequenceFile.Reader(conf, indexOptions);
443    }
444
445    /**
446     * Override this method to specialize the type of
447     * {@link SequenceFile.Reader} returned.
448     */
449    protected SequenceFile.Reader 
450      createDataFileReader(Path dataFile, Configuration conf,
451                           SequenceFile.Reader.Option... options
452                           ) throws IOException {
453      SequenceFile.Reader.Option[] newOptions =
454        Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
455      return new SequenceFile.Reader(conf, newOptions);
456    }
457
458    private void readIndex() throws IOException {
459      // read the index entirely into memory
460      if (this.keys != null)
461        return;
462      this.count = 0;
463      this.positions = new long[1024];
464
465      try {
466        int skip = INDEX_SKIP;
467        LongWritable position = new LongWritable();
468        WritableComparable lastKey = null;
469        long lastIndex = -1;
470        ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
471        while (true) {
472          WritableComparable k = comparator.newKey();
473
474          if (!index.next(k, position))
475            break;
476
477          // check order to make sure comparator is compatible
478          if (lastKey != null && comparator.compare(lastKey, k) > 0)
479            throw new IOException("key out of order: "+k+" after "+lastKey);
480          lastKey = k;
481          if (skip > 0) {
482            skip--;
483            continue;                             // skip this entry
484          } else {
485            skip = INDEX_SKIP;                    // reset skip
486          }
487
488          // don't read an index that is the same as the previous one. Block
489          // compressed map files used to do this (multiple entries would point
490          // at the same block)
491          if (position.get() == lastIndex)
492            continue;
493
494          if (count == positions.length) {
495            positions = Arrays.copyOf(positions, positions.length * 2);
496          }
497
498          keyBuilder.add(k);
499          positions[count] = position.get();
500          count++;
501        }
502
503        this.keys = keyBuilder.toArray(new WritableComparable[count]);
504        positions = Arrays.copyOf(positions, count);
505      } catch (EOFException e) {
506        LOG.warn("Unexpected EOF reading " + index +
507                              " at entry #" + count + ".  Ignoring.");
508      } finally {
509        indexClosed = true;
510        index.close();
511      }
512    }
513
514    /** Re-positions the reader before its first key. */
515    public synchronized void reset() throws IOException {
516      data.seek(firstPosition);
517    }
518
519    /** Get the key at approximately the middle of the file. Or null if the
520     *  file is empty. 
521     */
522    public synchronized WritableComparable midKey() throws IOException {
523
524      readIndex();
525      if (count == 0) {
526        return null;
527      }
528    
529      return keys[(count - 1) / 2];
530    }
531    
532    /** Reads the final key from the file.
533     *
534     * @param key key to read into
535     */
536    public synchronized void finalKey(WritableComparable key)
537      throws IOException {
538
539      long originalPosition = data.getPosition(); // save position
540      try {
541        readIndex();                              // make sure index is valid
542        if (count > 0) {
543          data.seek(positions[count-1]);          // skip to last indexed entry
544        } else {
545          reset();                                // start at the beginning
546        }
547        while (data.next(key)) {}                 // scan to eof
548
549      } finally {
550        data.seek(originalPosition);              // restore position
551      }
552    }
553
554    /** Positions the reader at the named key, or if none such exists, at the
555     * first entry after the named key.  Returns true iff the named key exists
556     * in this map.
557     */
558    public synchronized boolean seek(WritableComparable key) throws IOException {
559      return seekInternal(key) == 0;
560    }
561
562    /** 
563     * Positions the reader at the named key, or if none such exists, at the
564     * first entry after the named key.
565     *
566     * @return  0   - exact match found
567     *          < 0 - positioned at next record
568     *          1   - no more records in file
569     */
570    private synchronized int seekInternal(WritableComparable key)
571      throws IOException {
572      return seekInternal(key, false);
573    }
574
575    /** 
576     * Positions the reader at the named key, or if none such exists, at the
577     * key that falls just before or just after dependent on how the
578     * <code>before</code> parameter is set.
579     * 
580     * @param before - IF true, and <code>key</code> does not exist, position
581     * file at entry that falls just before <code>key</code>.  Otherwise,
582     * position file at record that sorts just after.
583     * @return  0   - exact match found
584     *          < 0 - positioned at next record
585     *          1   - no more records in file
586     */
587    private synchronized int seekInternal(WritableComparable key,
588        final boolean before)
589      throws IOException {
590      readIndex();                                // make sure index is read
591
592      if (seekIndex != -1                         // seeked before
593          && seekIndex+1 < count           
594          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
595          && comparator.compare(key, nextKey)
596          >= 0) {                                 // but after last seeked
597        // do nothing
598      } else {
599        seekIndex = binarySearch(key);
600        if (seekIndex < 0)                        // decode insertion point
601          seekIndex = -seekIndex-2;
602
603        if (seekIndex == -1)                      // belongs before first entry
604          seekPosition = firstPosition;           // use beginning of file
605        else
606          seekPosition = positions[seekIndex];    // else use index
607      }
608      data.seek(seekPosition);
609      
610      if (nextKey == null)
611        nextKey = comparator.newKey();
612     
613      // If we're looking for the key before, we need to keep track
614      // of the position we got the current key as well as the position
615      // of the key before it.
616      long prevPosition = -1;
617      long curPosition = seekPosition;
618
619      while (data.next(nextKey)) {
620        int c = comparator.compare(key, nextKey);
621        if (c <= 0) {                             // at or beyond desired
622          if (before && c != 0) {
623            if (prevPosition == -1) {
624              // We're on the first record of this index block
625              // and we've already passed the search key. Therefore
626              // we must be at the beginning of the file, so seek
627              // to the beginning of this block and return c
628              data.seek(curPosition);
629            } else {
630              // We have a previous record to back up to
631              data.seek(prevPosition);
632              data.next(nextKey);
633              // now that we've rewound, the search key must be greater than this key
634              return 1;
635            }
636          }
637          return c;
638        }
639        if (before) {
640          prevPosition = curPosition;
641          curPosition = data.getPosition();
642        }
643      }
644
645      return 1;
646    }
647
648    private int binarySearch(WritableComparable key) {
649      int low = 0;
650      int high = count-1;
651
652      while (low <= high) {
653        int mid = (low + high) >>> 1;
654        WritableComparable midVal = keys[mid];
655        int cmp = comparator.compare(midVal, key);
656
657        if (cmp < 0)
658          low = mid + 1;
659        else if (cmp > 0)
660          high = mid - 1;
661        else
662          return mid;                             // key found
663      }
664      return -(low + 1);                          // key not found.
665    }
666
667    /** Read the next key/value pair in the map into <code>key</code> and
668     * <code>val</code>.  Returns true if such a pair exists and false when at
669     * the end of the map */
670    public synchronized boolean next(WritableComparable key, Writable val)
671      throws IOException {
672      return data.next(key, val);
673    }
674
675    /** Return the value for the named key, or null if none exists. */
676    public synchronized Writable get(WritableComparable key, Writable val)
677      throws IOException {
678      if (seek(key)) {
679        data.getCurrentValue(val);
680        return val;
681      } else
682        return null;
683    }
684
685    /** 
686     * Finds the record that is the closest match to the specified key.
687     * Returns <code>key</code> or if it does not exist, at the first entry
688     * after the named key.
689     * 
690-     * @param key       - key that we're trying to find
691-     * @param val       - data value if key is found
692-     * @return          - the key that was the closest match or null if eof.
693     */
694    public synchronized WritableComparable getClosest(WritableComparable key,
695      Writable val)
696    throws IOException {
697      return getClosest(key, val, false);
698    }
699
700    /** 
701     * Finds the record that is the closest match to the specified key.
702     * 
703     * @param key       - key that we're trying to find
704     * @param val       - data value if key is found
705     * @param before    - IF true, and <code>key</code> does not exist, return
706     * the first entry that falls just before the <code>key</code>.  Otherwise,
707     * return the record that sorts just after.
708     * @return          - the key that was the closest match or null if eof.
709     */
710    public synchronized WritableComparable getClosest(WritableComparable key,
711        Writable val, final boolean before)
712      throws IOException {
713     
714      int c = seekInternal(key, before);
715
716      // If we didn't get an exact match, and we ended up in the wrong
717      // direction relative to the query key, return null since we
718      // must be at the beginning or end of the file.
719      if ((!before && c > 0) ||
720          (before && c < 0)) {
721        return null;
722      }
723
724      data.getCurrentValue(val);
725      return nextKey;
726    }
727
728    /** Close the map. */
729    @Override
730    public synchronized void close() throws IOException {
731      if (!indexClosed) {
732        index.close();
733      }
734      data.close();
735    }
736
737  }
738
739  /** Renames an existing map directory. */
740  public static void rename(FileSystem fs, String oldName, String newName)
741    throws IOException {
742    Path oldDir = new Path(oldName);
743    Path newDir = new Path(newName);
744    if (!fs.rename(oldDir, newDir)) {
745      throw new IOException("Could not rename " + oldDir + " to " + newDir);
746    }
747  }
748
749  /** Deletes the named map file. */
750  public static void delete(FileSystem fs, String name) throws IOException {
751    Path dir = new Path(name);
752    Path data = new Path(dir, DATA_FILE_NAME);
753    Path index = new Path(dir, INDEX_FILE_NAME);
754
755    fs.delete(data, true);
756    fs.delete(index, true);
757    fs.delete(dir, true);
758  }
759
760  /**
761   * This method attempts to fix a corrupt MapFile by re-creating its index.
762   * @param fs filesystem
763   * @param dir directory containing the MapFile data and index
764   * @param keyClass key class (has to be a subclass of Writable)
765   * @param valueClass value class (has to be a subclass of Writable)
766   * @param dryrun do not perform any changes, just report what needs to be done
767   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
768   * @throws Exception
769   */
770  public static long fix(FileSystem fs, Path dir,
771                         Class<? extends Writable> keyClass,
772                         Class<? extends Writable> valueClass, boolean dryrun,
773                         Configuration conf) throws Exception {
774    String dr = (dryrun ? "[DRY RUN ] " : "");
775    Path data = new Path(dir, DATA_FILE_NAME);
776    Path index = new Path(dir, INDEX_FILE_NAME);
777    int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
778    if (!fs.exists(data)) {
779      // there's nothing we can do to fix this!
780      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
781    }
782    if (fs.exists(index)) {
783      // no fixing needed
784      return -1;
785    }
786    SequenceFile.Reader dataReader = 
787      new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
788    if (!dataReader.getKeyClass().equals(keyClass)) {
789      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
790                          ", got " + dataReader.getKeyClass().getName());
791    }
792    if (!dataReader.getValueClass().equals(valueClass)) {
793      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
794                          ", got " + dataReader.getValueClass().getName());
795    }
796    long cnt = 0L;
797    Writable key = ReflectionUtils.newInstance(keyClass, conf);
798    Writable value = ReflectionUtils.newInstance(valueClass, conf);
799    SequenceFile.Writer indexWriter = null;
800    if (!dryrun) {
801      indexWriter = 
802        SequenceFile.createWriter(conf, 
803                                  SequenceFile.Writer.file(index), 
804                                  SequenceFile.Writer.keyClass(keyClass), 
805                                  SequenceFile.Writer.valueClass
806                                    (LongWritable.class));
807    }
808    try {
809      long pos = 0L;
810      LongWritable position = new LongWritable();
811      while(dataReader.next(key, value)) {
812        cnt++;
813        if (cnt % indexInterval == 0) {
814          position.set(pos);
815          if (!dryrun) indexWriter.append(key, position);
816        }
817        pos = dataReader.getPosition();
818      }
819    } catch(Throwable t) {
820      // truncated data file. swallow it.
821    }
822    dataReader.close();
823    if (!dryrun) indexWriter.close();
824    return cnt;
825  }
826
827
828  public static void main(String[] args) throws Exception {
829    String usage = "Usage: MapFile inFile outFile";
830      
831    if (args.length != 2) {
832      System.err.println(usage);
833      System.exit(-1);
834    }
835      
836    String in = args[0];
837    String out = args[1];
838
839    Configuration conf = new Configuration();
840    FileSystem fs = FileSystem.getLocal(conf);
841    MapFile.Reader reader = null;
842    MapFile.Writer writer = null;
843    try {
844      reader = new MapFile.Reader(fs, in, conf);
845      writer =
846        new MapFile.Writer(conf, fs, out,
847            reader.getKeyClass().asSubclass(WritableComparable.class),
848            reader.getValueClass());
849
850      WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
851        .asSubclass(WritableComparable.class), conf);
852      Writable value = ReflectionUtils.newInstance(reader.getValueClass()
853        .asSubclass(Writable.class), conf);
854
855      while (reader.next(key, value))               // copy all entries
856        writer.append(key, value);
857    } finally {
858      IOUtils.cleanup(LOG, writer, reader);
859    }
860  }
861}