001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Arrays;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.HadoopIllegalArgumentException;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.io.IOUtils;
035import org.apache.hadoop.io.SequenceFile.CompressionType;
036import org.apache.hadoop.io.compress.CompressionCodec;
037import org.apache.hadoop.util.Options;
038import org.apache.hadoop.util.Progressable;
039import org.apache.hadoop.util.ReflectionUtils;
040
041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_DEFAULT;
042import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_KEY;
043
044/** A file-based map from keys to values.
045 * 
046 * <p>A map is a directory containing two files, the <code>data</code> file,
047 * containing all keys and values in the map, and a smaller <code>index</code>
048 * file, containing a fraction of the keys.  The fraction is determined by
049 * {@link Writer#getIndexInterval()}.
050 *
051 * <p>The index file is read entirely into memory.  Thus key implementations
052 * should try to keep themselves small.
053 *
054 * <p>Map files are created by adding entries in-order.  To maintain a large
055 * database, perform updates by copying the previous version of a database and
056 * merging in a sorted change list, to create a new version of the database in
057 * a new file.  Sorting large change lists can be done with {@link
058 * SequenceFile.Sorter}.
059 */
060@InterfaceAudience.Public
061@InterfaceStability.Stable
062public class MapFile {
063  private static final Log LOG = LogFactory.getLog(MapFile.class);
064
065  /** The name of the index file. */
066  public static final String INDEX_FILE_NAME = "index";
067
068  /** The name of the data file. */
069  public static final String DATA_FILE_NAME = "data";
070
071  protected MapFile() {}                          // no public ctor
072
073  /** Writes a new map. */
074  public static class Writer implements java.io.Closeable {
075    private SequenceFile.Writer data;
076    private SequenceFile.Writer index;
077
078    final private static String INDEX_INTERVAL = "io.map.index.interval";
079    private int indexInterval = 128;
080
081    private long size;
082    private LongWritable position = new LongWritable();
083
084    // the following fields are used only for checking key order
085    private WritableComparator comparator;
086    private DataInputBuffer inBuf = new DataInputBuffer();
087    private DataOutputBuffer outBuf = new DataOutputBuffer();
088    private WritableComparable lastKey;
089
090    /** What's the position (in bytes) we wrote when we got the last index */
091    private long lastIndexPos = -1;
092
093    /**
094     * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
095     * we have an index at position zero -- midKey will throw an exception if this
096     * is not the case
097     */
098    private long lastIndexKeyCount = Long.MIN_VALUE;
099
100
101    /** Create the named map for keys of the named class. 
102     * @deprecated Use Writer(Configuration, Path, Option...) instead.
103     */
104    @Deprecated
105    public Writer(Configuration conf, FileSystem fs, String dirName,
106                  Class<? extends WritableComparable> keyClass, 
107                  Class valClass) throws IOException {
108      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
109    }
110
111    /** Create the named map for keys of the named class. 
112     * @deprecated Use Writer(Configuration, Path, Option...) instead.
113     */
114    @Deprecated
115    public Writer(Configuration conf, FileSystem fs, String dirName,
116                  Class<? extends WritableComparable> keyClass, Class valClass,
117                  CompressionType compress, 
118                  Progressable progress) throws IOException {
119      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
120           compression(compress), progressable(progress));
121    }
122
123    /** Create the named map for keys of the named class. 
124     * @deprecated Use Writer(Configuration, Path, Option...) instead.
125     */
126    @Deprecated
127    public Writer(Configuration conf, FileSystem fs, String dirName,
128                  Class<? extends WritableComparable> keyClass, Class valClass,
129                  CompressionType compress, CompressionCodec codec,
130                  Progressable progress) throws IOException {
131      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
132           compression(compress, codec), progressable(progress));
133    }
134
135    /** Create the named map for keys of the named class. 
136     * @deprecated Use Writer(Configuration, Path, Option...) instead.
137     */
138    @Deprecated
139    public Writer(Configuration conf, FileSystem fs, String dirName,
140                  Class<? extends WritableComparable> keyClass, Class valClass,
141                  CompressionType compress) throws IOException {
142      this(conf, new Path(dirName), keyClass(keyClass),
143           valueClass(valClass), compression(compress));
144    }
145
146    /** Create the named map using the named key comparator. 
147     * @deprecated Use Writer(Configuration, Path, Option...) instead.
148     */
149    @Deprecated
150    public Writer(Configuration conf, FileSystem fs, String dirName,
151                  WritableComparator comparator, Class valClass
152                  ) throws IOException {
153      this(conf, new Path(dirName), comparator(comparator), 
154           valueClass(valClass));
155    }
156
157    /** Create the named map using the named key comparator. 
158     * @deprecated Use Writer(Configuration, Path, Option...) instead.
159     */
160    @Deprecated
161    public Writer(Configuration conf, FileSystem fs, String dirName,
162                  WritableComparator comparator, Class valClass,
163                  SequenceFile.CompressionType compress) throws IOException {
164      this(conf, new Path(dirName), comparator(comparator),
165           valueClass(valClass), compression(compress));
166    }
167
168    /** Create the named map using the named key comparator. 
169     * @deprecated Use Writer(Configuration, Path, Option...)} instead.
170     */
171    @Deprecated
172    public Writer(Configuration conf, FileSystem fs, String dirName,
173                  WritableComparator comparator, Class valClass,
174                  SequenceFile.CompressionType compress,
175                  Progressable progress) throws IOException {
176      this(conf, new Path(dirName), comparator(comparator),
177           valueClass(valClass), compression(compress),
178           progressable(progress));
179    }
180
181    /** Create the named map using the named key comparator. 
182     * @deprecated Use Writer(Configuration, Path, Option...) instead.
183     */
184    @Deprecated
185    public Writer(Configuration conf, FileSystem fs, String dirName,
186                  WritableComparator comparator, Class valClass,
187                  SequenceFile.CompressionType compress, CompressionCodec codec,
188                  Progressable progress) throws IOException {
189      this(conf, new Path(dirName), comparator(comparator),
190           valueClass(valClass), compression(compress, codec),
191           progressable(progress));
192    }
193    
194    // our options are a superset of sequence file writer options
195    public static interface Option extends SequenceFile.Writer.Option { }
196    
197    private static class KeyClassOption extends Options.ClassOption
198                                        implements Option {
199      KeyClassOption(Class<?> value) {
200        super(value);
201      }
202    }
203    
204    private static class ComparatorOption implements Option {
205      private final WritableComparator value;
206      ComparatorOption(WritableComparator value) {
207        this.value = value;
208      }
209      WritableComparator getValue() {
210        return value;
211      }
212    }
213
214    public static Option keyClass(Class<? extends WritableComparable> value) {
215      return new KeyClassOption(value);
216    }
217    
218    public static Option comparator(WritableComparator value) {
219      return new ComparatorOption(value);
220    }
221
222    public static SequenceFile.Writer.Option valueClass(Class<?> value) {
223      return SequenceFile.Writer.valueClass(value);
224    }
225    
226    public static 
227    SequenceFile.Writer.Option compression(CompressionType type) {
228      return SequenceFile.Writer.compression(type);
229    }
230
231    public static 
232    SequenceFile.Writer.Option compression(CompressionType type,
233        CompressionCodec codec) {
234      return SequenceFile.Writer.compression(type, codec);
235    }
236
237    public static SequenceFile.Writer.Option progressable(Progressable value) {
238      return SequenceFile.Writer.progressable(value);
239    }
240
241    @SuppressWarnings("unchecked")
242    public Writer(Configuration conf, 
243                  Path dirName,
244                  SequenceFile.Writer.Option... opts
245                  ) throws IOException {
246      KeyClassOption keyClassOption = 
247        Options.getOption(KeyClassOption.class, opts);
248      ComparatorOption comparatorOption =
249        Options.getOption(ComparatorOption.class, opts);
250      if ((keyClassOption == null) == (comparatorOption == null)) {
251        throw new IllegalArgumentException("key class or comparator option "
252                                           + "must be set");
253      }
254      this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
255
256      Class<? extends WritableComparable> keyClass;
257      if (keyClassOption == null) {
258        this.comparator = comparatorOption.getValue();
259        keyClass = comparator.getKeyClass();
260      } else {
261        keyClass= 
262          (Class<? extends WritableComparable>) keyClassOption.getValue();
263        this.comparator = WritableComparator.get(keyClass, conf);
264      }
265      this.lastKey = comparator.newKey();
266      FileSystem fs = dirName.getFileSystem(conf);
267
268      if (!fs.mkdirs(dirName)) {
269        throw new IOException("Mkdirs failed to create directory " + dirName);
270      }
271      Path dataFile = new Path(dirName, DATA_FILE_NAME);
272      Path indexFile = new Path(dirName, INDEX_FILE_NAME);
273
274      SequenceFile.Writer.Option[] dataOptions =
275        Options.prependOptions(opts, 
276                               SequenceFile.Writer.file(dataFile),
277                               SequenceFile.Writer.keyClass(keyClass));
278      this.data = SequenceFile.createWriter(conf, dataOptions);
279
280      SequenceFile.Writer.Option[] indexOptions =
281        Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
282            SequenceFile.Writer.keyClass(keyClass),
283            SequenceFile.Writer.valueClass(LongWritable.class),
284            SequenceFile.Writer.compression(CompressionType.BLOCK));
285      this.index = SequenceFile.createWriter(conf, indexOptions);      
286    }
287
288    /** The number of entries that are added before an index entry is added.*/
289    public int getIndexInterval() { return indexInterval; }
290
291    /** Sets the index interval.
292     * @see #getIndexInterval()
293     */
294    public void setIndexInterval(int interval) { indexInterval = interval; }
295
296    /** Sets the index interval and stores it in conf
297     * @see #getIndexInterval()
298     */
299    public static void setIndexInterval(Configuration conf, int interval) {
300      conf.setInt(INDEX_INTERVAL, interval);
301    }
302
303    /** Close the map. */
304    @Override
305    public synchronized void close() throws IOException {
306      data.close();
307      index.close();
308    }
309
310    /** Append a key/value pair to the map.  The key must be greater or equal
311     * to the previous key added to the map. */
312    public synchronized void append(WritableComparable key, Writable val)
313      throws IOException {
314
315      checkKey(key);
316
317      long pos = data.getLength();      
318      // Only write an index if we've changed positions. In a block compressed
319      // file, this means we write an entry at the start of each block      
320      if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
321        position.set(pos);                        // point to current eof
322        index.append(key, position);
323        lastIndexPos = pos;
324        lastIndexKeyCount = size;
325      }
326
327      data.append(key, val);                      // append key/value to data
328      size++;
329    }
330
331    private void checkKey(WritableComparable key) throws IOException {
332      // check that keys are well-ordered
333      if (size != 0 && comparator.compare(lastKey, key) > 0)
334        throw new IOException("key out of order: "+key+" after "+lastKey);
335          
336      // update lastKey with a copy of key by writing and reading
337      outBuf.reset();
338      key.write(outBuf);                          // write new key
339
340      inBuf.reset(outBuf.getData(), outBuf.getLength());
341      lastKey.readFields(inBuf);                  // read into lastKey
342    }
343
344  }
345  
346  /** Provide access to an existing map. */
347  public static class Reader implements java.io.Closeable {
348      
349    /** Number of index entries to skip between each entry.  Zero by default.
350     * Setting this to values larger than zero can facilitate opening large map
351     * files using less memory. */
352    private int INDEX_SKIP = 0;
353      
354    private WritableComparator comparator;
355
356    private WritableComparable nextKey;
357    private long seekPosition = -1;
358    private int seekIndex = -1;
359    private long firstPosition;
360
361    // the data, on disk
362    private SequenceFile.Reader data;
363    private SequenceFile.Reader index;
364
365    // whether the index Reader was closed
366    private boolean indexClosed = false;
367
368    // the index, in memory
369    private int count = -1;
370    private WritableComparable[] keys;
371    private long[] positions;
372
373    /** Returns the class of keys in this file. */
374    public Class<?> getKeyClass() { return data.getKeyClass(); }
375
376    /** Returns the class of values in this file. */
377    public Class<?> getValueClass() { return data.getValueClass(); }
378
379    public static interface Option extends SequenceFile.Reader.Option {}
380    
381    public static Option comparator(WritableComparator value) {
382      return new ComparatorOption(value);
383    }
384
385    static class ComparatorOption implements Option {
386      private final WritableComparator value;
387      ComparatorOption(WritableComparator value) {
388        this.value = value;
389      }
390      WritableComparator getValue() {
391        return value;
392      }
393    }
394
395    public Reader(Path dir, Configuration conf,
396                  SequenceFile.Reader.Option... opts) throws IOException {
397      ComparatorOption comparatorOption = 
398        Options.getOption(ComparatorOption.class, opts);
399      WritableComparator comparator =
400        comparatorOption == null ? null : comparatorOption.getValue();
401      INDEX_SKIP = conf.getInt(
402          IO_MAP_INDEX_SKIP_KEY, IO_MAP_INDEX_SKIP_DEFAULT);
403      open(dir, comparator, conf, opts);
404    }
405 
406    /** Construct a map reader for the named map.
407     * @deprecated
408     */
409    @Deprecated
410    public Reader(FileSystem fs, String dirName, 
411                  Configuration conf) throws IOException {
412      this(new Path(dirName), conf);
413    }
414
415    /** Construct a map reader for the named map using the named comparator.
416     * @deprecated
417     */
418    @Deprecated
419    public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
420                  Configuration conf) throws IOException {
421      this(new Path(dirName), conf, comparator(comparator));
422    }
423    
424    protected synchronized void open(Path dir,
425                                     WritableComparator comparator,
426                                     Configuration conf, 
427                                     SequenceFile.Reader.Option... options
428                                     ) throws IOException {
429      Path dataFile = new Path(dir, DATA_FILE_NAME);
430      Path indexFile = new Path(dir, INDEX_FILE_NAME);
431
432      // open the data
433      this.data = createDataFileReader(dataFile, conf, options);
434      this.firstPosition = data.getPosition();
435
436      if (comparator == null) {
437        Class<? extends WritableComparable> cls;
438        cls = data.getKeyClass().asSubclass(WritableComparable.class);
439        this.comparator = WritableComparator.get(cls, conf);
440      } else {
441        this.comparator = comparator;
442      }
443
444      // open the index
445      SequenceFile.Reader.Option[] indexOptions =
446        Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
447      this.index = new SequenceFile.Reader(conf, indexOptions);
448    }
449
450    /**
451     * Override this method to specialize the type of
452     * {@link SequenceFile.Reader} returned.
453     */
454    protected SequenceFile.Reader 
455      createDataFileReader(Path dataFile, Configuration conf,
456                           SequenceFile.Reader.Option... options
457                           ) throws IOException {
458      SequenceFile.Reader.Option[] newOptions =
459        Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
460      return new SequenceFile.Reader(conf, newOptions);
461    }
462
463    private void readIndex() throws IOException {
464      // read the index entirely into memory
465      if (this.keys != null)
466        return;
467      this.count = 0;
468      this.positions = new long[1024];
469
470      try {
471        int skip = INDEX_SKIP;
472        LongWritable position = new LongWritable();
473        WritableComparable lastKey = null;
474        long lastIndex = -1;
475        ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
476        while (true) {
477          WritableComparable k = comparator.newKey();
478
479          if (!index.next(k, position))
480            break;
481
482          // check order to make sure comparator is compatible
483          if (lastKey != null && comparator.compare(lastKey, k) > 0)
484            throw new IOException("key out of order: "+k+" after "+lastKey);
485          lastKey = k;
486          if (skip > 0) {
487            skip--;
488            continue;                             // skip this entry
489          } else {
490            skip = INDEX_SKIP;                    // reset skip
491          }
492
493          // don't read an index that is the same as the previous one. Block
494          // compressed map files used to do this (multiple entries would point
495          // at the same block)
496          if (position.get() == lastIndex)
497            continue;
498
499          if (count == positions.length) {
500            positions = Arrays.copyOf(positions, positions.length * 2);
501          }
502
503          keyBuilder.add(k);
504          positions[count] = position.get();
505          count++;
506        }
507
508        this.keys = keyBuilder.toArray(new WritableComparable[count]);
509        positions = Arrays.copyOf(positions, count);
510      } catch (EOFException e) {
511        LOG.warn("Unexpected EOF reading " + index +
512                              " at entry #" + count + ".  Ignoring.");
513      } finally {
514        indexClosed = true;
515        index.close();
516      }
517    }
518
519    /** Re-positions the reader before its first key. */
520    public synchronized void reset() throws IOException {
521      data.seek(firstPosition);
522    }
523
524    /** Get the key at approximately the middle of the file. Or null if the
525     *  file is empty. 
526     */
527    public synchronized WritableComparable midKey() throws IOException {
528
529      readIndex();
530      if (count == 0) {
531        return null;
532      }
533    
534      return keys[(count - 1) / 2];
535    }
536    
537    /** Reads the final key from the file.
538     *
539     * @param key key to read into
540     */
541    public synchronized void finalKey(WritableComparable key)
542      throws IOException {
543
544      long originalPosition = data.getPosition(); // save position
545      try {
546        readIndex();                              // make sure index is valid
547        if (count > 0) {
548          data.seek(positions[count-1]);          // skip to last indexed entry
549        } else {
550          reset();                                // start at the beginning
551        }
552        while (data.next(key)) {}                 // scan to eof
553
554      } finally {
555        data.seek(originalPosition);              // restore position
556      }
557    }
558
559    /** Positions the reader at the named key, or if none such exists, at the
560     * first entry after the named key.  Returns true iff the named key exists
561     * in this map.
562     */
563    public synchronized boolean seek(WritableComparable key) throws IOException {
564      return seekInternal(key) == 0;
565    }
566
567    /** 
568     * Positions the reader at the named key, or if none such exists, at the
569     * first entry after the named key.
570     *
571     * @return  0   - exact match found
572     *          < 0 - positioned at next record
573     *          1   - no more records in file
574     */
575    private synchronized int seekInternal(WritableComparable key)
576      throws IOException {
577      return seekInternal(key, false);
578    }
579
580    /** 
581     * Positions the reader at the named key, or if none such exists, at the
582     * key that falls just before or just after dependent on how the
583     * <code>before</code> parameter is set.
584     * 
585     * @param before - IF true, and <code>key</code> does not exist, position
586     * file at entry that falls just before <code>key</code>.  Otherwise,
587     * position file at record that sorts just after.
588     * @return  0   - exact match found
589     *          < 0 - positioned at next record
590     *          1   - no more records in file
591     */
592    private synchronized int seekInternal(WritableComparable key,
593        final boolean before)
594      throws IOException {
595      readIndex();                                // make sure index is read
596
597      if (seekIndex != -1                         // seeked before
598          && seekIndex+1 < count           
599          && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
600          && comparator.compare(key, nextKey)
601          >= 0) {                                 // but after last seeked
602        // do nothing
603      } else {
604        seekIndex = binarySearch(key);
605        if (seekIndex < 0)                        // decode insertion point
606          seekIndex = -seekIndex-2;
607
608        if (seekIndex == -1)                      // belongs before first entry
609          seekPosition = firstPosition;           // use beginning of file
610        else
611          seekPosition = positions[seekIndex];    // else use index
612      }
613      data.seek(seekPosition);
614      
615      if (nextKey == null)
616        nextKey = comparator.newKey();
617     
618      // If we're looking for the key before, we need to keep track
619      // of the position we got the current key as well as the position
620      // of the key before it.
621      long prevPosition = -1;
622      long curPosition = seekPosition;
623
624      while (data.next(nextKey)) {
625        int c = comparator.compare(key, nextKey);
626        if (c <= 0) {                             // at or beyond desired
627          if (before && c != 0) {
628            if (prevPosition == -1) {
629              // We're on the first record of this index block
630              // and we've already passed the search key. Therefore
631              // we must be at the beginning of the file, so seek
632              // to the beginning of this block and return c
633              data.seek(curPosition);
634            } else {
635              // We have a previous record to back up to
636              data.seek(prevPosition);
637              data.next(nextKey);
638              // now that we've rewound, the search key must be greater than this key
639              return 1;
640            }
641          }
642          return c;
643        }
644        if (before) {
645          prevPosition = curPosition;
646          curPosition = data.getPosition();
647        }
648      }
649
650      return 1;
651    }
652
653    private int binarySearch(WritableComparable key) {
654      int low = 0;
655      int high = count-1;
656
657      while (low <= high) {
658        int mid = (low + high) >>> 1;
659        WritableComparable midVal = keys[mid];
660        int cmp = comparator.compare(midVal, key);
661
662        if (cmp < 0)
663          low = mid + 1;
664        else if (cmp > 0)
665          high = mid - 1;
666        else
667          return mid;                             // key found
668      }
669      return -(low + 1);                          // key not found.
670    }
671
672    /** Read the next key/value pair in the map into <code>key</code> and
673     * <code>val</code>.  Returns true if such a pair exists and false when at
674     * the end of the map */
675    public synchronized boolean next(WritableComparable key, Writable val)
676      throws IOException {
677      return data.next(key, val);
678    }
679
680    /** Return the value for the named key, or null if none exists. */
681    public synchronized Writable get(WritableComparable key, Writable val)
682      throws IOException {
683      if (seek(key)) {
684        data.getCurrentValue(val);
685        return val;
686      } else
687        return null;
688    }
689
690    /** 
691     * Finds the record that is the closest match to the specified key.
692     * Returns <code>key</code> or if it does not exist, at the first entry
693     * after the named key.
694     * 
695-     * @param key       - key that we're trying to find
696-     * @param val       - data value if key is found
697-     * @return          - the key that was the closest match or null if eof.
698     */
699    public synchronized WritableComparable getClosest(WritableComparable key,
700      Writable val)
701    throws IOException {
702      return getClosest(key, val, false);
703    }
704
705    /** 
706     * Finds the record that is the closest match to the specified key.
707     * 
708     * @param key       - key that we're trying to find
709     * @param val       - data value if key is found
710     * @param before    - IF true, and <code>key</code> does not exist, return
711     * the first entry that falls just before the <code>key</code>.  Otherwise,
712     * return the record that sorts just after.
713     * @return          - the key that was the closest match or null if eof.
714     */
715    public synchronized WritableComparable getClosest(WritableComparable key,
716        Writable val, final boolean before)
717      throws IOException {
718     
719      int c = seekInternal(key, before);
720
721      // If we didn't get an exact match, and we ended up in the wrong
722      // direction relative to the query key, return null since we
723      // must be at the beginning or end of the file.
724      if ((!before && c > 0) ||
725          (before && c < 0)) {
726        return null;
727      }
728
729      data.getCurrentValue(val);
730      return nextKey;
731    }
732
733    /** Close the map. */
734    @Override
735    public synchronized void close() throws IOException {
736      if (!indexClosed) {
737        index.close();
738      }
739      data.close();
740    }
741
742  }
743
744  /** Renames an existing map directory. */
745  public static void rename(FileSystem fs, String oldName, String newName)
746    throws IOException {
747    Path oldDir = new Path(oldName);
748    Path newDir = new Path(newName);
749    if (!fs.rename(oldDir, newDir)) {
750      throw new IOException("Could not rename " + oldDir + " to " + newDir);
751    }
752  }
753
754  /** Deletes the named map file. */
755  public static void delete(FileSystem fs, String name) throws IOException {
756    Path dir = new Path(name);
757    Path data = new Path(dir, DATA_FILE_NAME);
758    Path index = new Path(dir, INDEX_FILE_NAME);
759
760    fs.delete(data, true);
761    fs.delete(index, true);
762    fs.delete(dir, true);
763  }
764
765  /**
766   * This method attempts to fix a corrupt MapFile by re-creating its index.
767   * @param fs filesystem
768   * @param dir directory containing the MapFile data and index
769   * @param keyClass key class (has to be a subclass of Writable)
770   * @param valueClass value class (has to be a subclass of Writable)
771   * @param dryrun do not perform any changes, just report what needs to be done
772   * @return number of valid entries in this MapFile, or -1 if no fixing was needed
773   * @throws Exception
774   */
775  public static long fix(FileSystem fs, Path dir,
776                         Class<? extends Writable> keyClass,
777                         Class<? extends Writable> valueClass, boolean dryrun,
778                         Configuration conf) throws Exception {
779    String dr = (dryrun ? "[DRY RUN ] " : "");
780    Path data = new Path(dir, DATA_FILE_NAME);
781    Path index = new Path(dir, INDEX_FILE_NAME);
782    int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
783    if (!fs.exists(data)) {
784      // there's nothing we can do to fix this!
785      throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
786    }
787    if (fs.exists(index)) {
788      // no fixing needed
789      return -1;
790    }
791    SequenceFile.Reader dataReader = 
792      new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
793    if (!dataReader.getKeyClass().equals(keyClass)) {
794      throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
795                          ", got " + dataReader.getKeyClass().getName());
796    }
797    if (!dataReader.getValueClass().equals(valueClass)) {
798      throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
799                          ", got " + dataReader.getValueClass().getName());
800    }
801    long cnt = 0L;
802    Writable key = ReflectionUtils.newInstance(keyClass, conf);
803    Writable value = ReflectionUtils.newInstance(valueClass, conf);
804    SequenceFile.Writer indexWriter = null;
805    if (!dryrun) {
806      indexWriter = 
807        SequenceFile.createWriter(conf, 
808                                  SequenceFile.Writer.file(index), 
809                                  SequenceFile.Writer.keyClass(keyClass), 
810                                  SequenceFile.Writer.valueClass
811                                    (LongWritable.class));
812    }
813    try {
814      long pos = 0L;
815      LongWritable position = new LongWritable();
816      while(dataReader.next(key, value)) {
817        cnt++;
818        if (cnt % indexInterval == 0) {
819          position.set(pos);
820          if (!dryrun) indexWriter.append(key, position);
821        }
822        pos = dataReader.getPosition();
823      }
824    } catch(Throwable t) {
825      // truncated data file. swallow it.
826    }
827    dataReader.close();
828    if (!dryrun) indexWriter.close();
829    return cnt;
830  }
831
832  /**
833   * Class to merge multiple MapFiles of same Key and Value types to one MapFile
834   */
835  public static class Merger {
836    private Configuration conf;
837    private WritableComparator comparator = null;
838    private Reader[] inReaders;
839    private Writer outWriter;
840    private Class<Writable> valueClass = null;
841    private Class<WritableComparable> keyClass = null;
842
843    public Merger(Configuration conf) throws IOException {
844      this.conf = conf;
845    }
846
847    /**
848     * Merge multiple MapFiles to one Mapfile
849     *
850     * @param inMapFiles
851     * @param outMapFile
852     * @throws IOException
853     */
854    public void merge(Path[] inMapFiles, boolean deleteInputs,
855        Path outMapFile) throws IOException {
856      try {
857        open(inMapFiles, outMapFile);
858        mergePass();
859      } finally {
860        close();
861      }
862      if (deleteInputs) {
863        for (int i = 0; i < inMapFiles.length; i++) {
864          Path path = inMapFiles[i];
865          delete(path.getFileSystem(conf), path.toString());
866        }
867      }
868    }
869
870    /*
871     * Open all input files for reading and verify the key and value types. And
872     * open Output file for writing
873     */
874    @SuppressWarnings("unchecked")
875    private void open(Path[] inMapFiles, Path outMapFile) throws IOException {
876      inReaders = new Reader[inMapFiles.length];
877      for (int i = 0; i < inMapFiles.length; i++) {
878        Reader reader = new Reader(inMapFiles[i], conf);
879        if (keyClass == null || valueClass == null) {
880          keyClass = (Class<WritableComparable>) reader.getKeyClass();
881          valueClass = (Class<Writable>) reader.getValueClass();
882        } else if (keyClass != reader.getKeyClass()
883            || valueClass != reader.getValueClass()) {
884          throw new HadoopIllegalArgumentException(
885              "Input files cannot be merged as they"
886                  + " have different Key and Value classes");
887        }
888        inReaders[i] = reader;
889      }
890
891      if (comparator == null) {
892        Class<? extends WritableComparable> cls;
893        cls = keyClass.asSubclass(WritableComparable.class);
894        this.comparator = WritableComparator.get(cls, conf);
895      } else if (comparator.getKeyClass() != keyClass) {
896        throw new HadoopIllegalArgumentException(
897            "Input files cannot be merged as they"
898                + " have different Key class compared to"
899                + " specified comparator");
900      }
901
902      outWriter = new MapFile.Writer(conf, outMapFile,
903          MapFile.Writer.keyClass(keyClass),
904          MapFile.Writer.valueClass(valueClass));
905    }
906
907    /**
908     * Merge all input files to output map file.<br>
909     * 1. Read first key/value from all input files to keys/values array. <br>
910     * 2. Select the least key and corresponding value. <br>
911     * 3. Write the selected key and value to output file. <br>
912     * 4. Replace the already written key/value in keys/values arrays with the
913     * next key/value from the selected input <br>
914     * 5. Repeat step 2-4 till all keys are read. <br>
915     */
916    private void mergePass() throws IOException {
917      // re-usable array
918      WritableComparable[] keys = new WritableComparable[inReaders.length];
919      Writable[] values = new Writable[inReaders.length];
920      // Read first key/value from all inputs
921      for (int i = 0; i < inReaders.length; i++) {
922        keys[i] = ReflectionUtils.newInstance(keyClass, null);
923        values[i] = ReflectionUtils.newInstance(valueClass, null);
924        if (!inReaders[i].next(keys[i], values[i])) {
925          // Handle empty files
926          keys[i] = null;
927          values[i] = null;
928        }
929      }
930
931      do {
932        int currentEntry = -1;
933        WritableComparable currentKey = null;
934        Writable currentValue = null;
935        for (int i = 0; i < keys.length; i++) {
936          if (keys[i] == null) {
937            // Skip Readers reached EOF
938            continue;
939          }
940          if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) {
941            currentEntry = i;
942            currentKey = keys[i];
943            currentValue = values[i];
944          }
945        }
946        if (currentKey == null) {
947          // Merge Complete
948          break;
949        }
950        // Write the selected key/value to merge stream
951        outWriter.append(currentKey, currentValue);
952        // Replace the already written key/value in keys/values arrays with the
953        // next key/value from the selected input
954        if (!inReaders[currentEntry].next(keys[currentEntry],
955            values[currentEntry])) {
956          // EOF for this file
957          keys[currentEntry] = null;
958          values[currentEntry] = null;
959        }
960      } while (true);
961    }
962
963    private void close() throws IOException {
964      for (int i = 0; i < inReaders.length; i++) {
965        IOUtils.closeStream(inReaders[i]);
966        inReaders[i] = null;
967      }
968      if (outWriter != null) {
969        outWriter.close();
970        outWriter = null;
971      }
972    }
973  }
974
975  public static void main(String[] args) throws Exception {
976    String usage = "Usage: MapFile inFile outFile";
977      
978    if (args.length != 2) {
979      System.err.println(usage);
980      System.exit(-1);
981    }
982      
983    String in = args[0];
984    String out = args[1];
985
986    Configuration conf = new Configuration();
987    FileSystem fs = FileSystem.getLocal(conf);
988    MapFile.Reader reader = null;
989    MapFile.Writer writer = null;
990    try {
991      reader = new MapFile.Reader(fs, in, conf);
992      writer =
993        new MapFile.Writer(conf, fs, out,
994            reader.getKeyClass().asSubclass(WritableComparable.class),
995            reader.getValueClass());
996
997      WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
998        .asSubclass(WritableComparable.class), conf);
999      Writable value = ReflectionUtils.newInstance(reader.getValueClass()
1000        .asSubclass(Writable.class), conf);
1001
1002      while (reader.next(key, value))               // copy all entries
1003        writer.append(key, value);
1004    } finally {
1005      IOUtils.cleanup(LOG, writer, reader);
1006    }
1007  }
1008}