001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.Path;
033    import org.apache.hadoop.io.IOUtils;
034    import org.apache.hadoop.io.SequenceFile.CompressionType;
035    import org.apache.hadoop.io.compress.CompressionCodec;
036    import org.apache.hadoop.util.Options;
037    import org.apache.hadoop.util.Progressable;
038    import org.apache.hadoop.util.ReflectionUtils;
039    
040    /** A file-based map from keys to values.
041     * 
042     * <p>A map is a directory containing two files, the <code>data</code> file,
043     * containing all keys and values in the map, and a smaller <code>index</code>
044     * file, containing a fraction of the keys.  The fraction is determined by
045     * {@link Writer#getIndexInterval()}.
046     *
047     * <p>The index file is read entirely into memory.  Thus key implementations
048     * should try to keep themselves small.
049     *
050     * <p>Map files are created by adding entries in-order.  To maintain a large
051     * database, perform updates by copying the previous version of a database and
052     * merging in a sorted change list, to create a new version of the database in
053     * a new file.  Sorting large change lists can be done with {@link
054     * SequenceFile.Sorter}.
055     */
056    @InterfaceAudience.Public
057    @InterfaceStability.Stable
058    public class MapFile {
059      private static final Log LOG = LogFactory.getLog(MapFile.class);
060    
061      /** The name of the index file. */
062      public static final String INDEX_FILE_NAME = "index";
063    
064      /** The name of the data file. */
065      public static final String DATA_FILE_NAME = "data";
066    
067      protected MapFile() {}                          // no public ctor
068    
069      /** Writes a new map. */
070      public static class Writer implements java.io.Closeable {
071        private SequenceFile.Writer data;
072        private SequenceFile.Writer index;
073    
074        final private static String INDEX_INTERVAL = "io.map.index.interval";
075        private int indexInterval = 128;
076    
077        private long size;
078        private LongWritable position = new LongWritable();
079    
080        // the following fields are used only for checking key order
081        private WritableComparator comparator;
082        private DataInputBuffer inBuf = new DataInputBuffer();
083        private DataOutputBuffer outBuf = new DataOutputBuffer();
084        private WritableComparable lastKey;
085    
086        /** What's the position (in bytes) we wrote when we got the last index */
087        private long lastIndexPos = -1;
088    
089        /**
090         * What was size when we last wrote an index. Set to MIN_VALUE to ensure that
091         * we have an index at position zero -- midKey will throw an exception if this
092         * is not the case
093         */
094        private long lastIndexKeyCount = Long.MIN_VALUE;
095    
096    
097        /** Create the named map for keys of the named class. 
098         * @deprecated Use Writer(Configuration, Path, Option...) instead.
099         */
100        @Deprecated
101        public Writer(Configuration conf, FileSystem fs, String dirName,
102                      Class<? extends WritableComparable> keyClass, 
103                      Class valClass) throws IOException {
104          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
105        }
106    
107        /** Create the named map for keys of the named class. 
108         * @deprecated Use Writer(Configuration, Path, Option...) instead.
109         */
110        @Deprecated
111        public Writer(Configuration conf, FileSystem fs, String dirName,
112                      Class<? extends WritableComparable> keyClass, Class valClass,
113                      CompressionType compress, 
114                      Progressable progress) throws IOException {
115          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
116               compression(compress), progressable(progress));
117        }
118    
119        /** Create the named map for keys of the named class. 
120         * @deprecated Use Writer(Configuration, Path, Option...) instead.
121         */
122        @Deprecated
123        public Writer(Configuration conf, FileSystem fs, String dirName,
124                      Class<? extends WritableComparable> keyClass, Class valClass,
125                      CompressionType compress, CompressionCodec codec,
126                      Progressable progress) throws IOException {
127          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
128               compression(compress, codec), progressable(progress));
129        }
130    
131        /** Create the named map for keys of the named class. 
132         * @deprecated Use Writer(Configuration, Path, Option...) instead.
133         */
134        @Deprecated
135        public Writer(Configuration conf, FileSystem fs, String dirName,
136                      Class<? extends WritableComparable> keyClass, Class valClass,
137                      CompressionType compress) throws IOException {
138          this(conf, new Path(dirName), keyClass(keyClass),
139               valueClass(valClass), compression(compress));
140        }
141    
142        /** Create the named map using the named key comparator. 
143         * @deprecated Use Writer(Configuration, Path, Option...) instead.
144         */
145        @Deprecated
146        public Writer(Configuration conf, FileSystem fs, String dirName,
147                      WritableComparator comparator, Class valClass
148                      ) throws IOException {
149          this(conf, new Path(dirName), comparator(comparator), 
150               valueClass(valClass));
151        }
152    
153        /** Create the named map using the named key comparator. 
154         * @deprecated Use Writer(Configuration, Path, Option...) instead.
155         */
156        @Deprecated
157        public Writer(Configuration conf, FileSystem fs, String dirName,
158                      WritableComparator comparator, Class valClass,
159                      SequenceFile.CompressionType compress) throws IOException {
160          this(conf, new Path(dirName), comparator(comparator),
161               valueClass(valClass), compression(compress));
162        }
163    
164        /** Create the named map using the named key comparator. 
165         * @deprecated Use Writer(Configuration, Path, Option...)} instead.
166         */
167        @Deprecated
168        public Writer(Configuration conf, FileSystem fs, String dirName,
169                      WritableComparator comparator, Class valClass,
170                      SequenceFile.CompressionType compress,
171                      Progressable progress) throws IOException {
172          this(conf, new Path(dirName), comparator(comparator),
173               valueClass(valClass), compression(compress),
174               progressable(progress));
175        }
176    
177        /** Create the named map using the named key comparator. 
178         * @deprecated Use Writer(Configuration, Path, Option...) instead.
179         */
180        @Deprecated
181        public Writer(Configuration conf, FileSystem fs, String dirName,
182                      WritableComparator comparator, Class valClass,
183                      SequenceFile.CompressionType compress, CompressionCodec codec,
184                      Progressable progress) throws IOException {
185          this(conf, new Path(dirName), comparator(comparator),
186               valueClass(valClass), compression(compress, codec),
187               progressable(progress));
188        }
189        
190        // our options are a superset of sequence file writer options
191        public static interface Option extends SequenceFile.Writer.Option { }
192        
193        private static class KeyClassOption extends Options.ClassOption
194                                            implements Option {
195          KeyClassOption(Class<?> value) {
196            super(value);
197          }
198        }
199        
200        private static class ComparatorOption implements Option {
201          private final WritableComparator value;
202          ComparatorOption(WritableComparator value) {
203            this.value = value;
204          }
205          WritableComparator getValue() {
206            return value;
207          }
208        }
209    
210        public static Option keyClass(Class<? extends WritableComparable> value) {
211          return new KeyClassOption(value);
212        }
213        
214        public static Option comparator(WritableComparator value) {
215          return new ComparatorOption(value);
216        }
217    
218        public static SequenceFile.Writer.Option valueClass(Class<?> value) {
219          return SequenceFile.Writer.valueClass(value);
220        }
221        
222        public static 
223        SequenceFile.Writer.Option compression(CompressionType type) {
224          return SequenceFile.Writer.compression(type);
225        }
226    
227        public static 
228        SequenceFile.Writer.Option compression(CompressionType type,
229            CompressionCodec codec) {
230          return SequenceFile.Writer.compression(type, codec);
231        }
232    
233        public static SequenceFile.Writer.Option progressable(Progressable value) {
234          return SequenceFile.Writer.progressable(value);
235        }
236    
237        @SuppressWarnings("unchecked")
238        public Writer(Configuration conf, 
239                      Path dirName,
240                      SequenceFile.Writer.Option... opts
241                      ) throws IOException {
242          KeyClassOption keyClassOption = 
243            Options.getOption(KeyClassOption.class, opts);
244          ComparatorOption comparatorOption =
245            Options.getOption(ComparatorOption.class, opts);
246          if ((keyClassOption == null) == (comparatorOption == null)) {
247            throw new IllegalArgumentException("key class or comparator option "
248                                               + "must be set");
249          }
250          this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
251    
252          Class<? extends WritableComparable> keyClass;
253          if (keyClassOption == null) {
254            this.comparator = comparatorOption.getValue();
255            keyClass = comparator.getKeyClass();
256          } else {
257            keyClass= 
258              (Class<? extends WritableComparable>) keyClassOption.getValue();
259            this.comparator = WritableComparator.get(keyClass, conf);
260          }
261          this.lastKey = comparator.newKey();
262          FileSystem fs = dirName.getFileSystem(conf);
263    
264          if (!fs.mkdirs(dirName)) {
265            throw new IOException("Mkdirs failed to create directory " + dirName);
266          }
267          Path dataFile = new Path(dirName, DATA_FILE_NAME);
268          Path indexFile = new Path(dirName, INDEX_FILE_NAME);
269    
270          SequenceFile.Writer.Option[] dataOptions =
271            Options.prependOptions(opts, 
272                                   SequenceFile.Writer.file(dataFile),
273                                   SequenceFile.Writer.keyClass(keyClass));
274          this.data = SequenceFile.createWriter(conf, dataOptions);
275    
276          SequenceFile.Writer.Option[] indexOptions =
277            Options.prependOptions(opts, SequenceFile.Writer.file(indexFile),
278                SequenceFile.Writer.keyClass(keyClass),
279                SequenceFile.Writer.valueClass(LongWritable.class),
280                SequenceFile.Writer.compression(CompressionType.BLOCK));
281          this.index = SequenceFile.createWriter(conf, indexOptions);      
282        }
283    
284        /** The number of entries that are added before an index entry is added.*/
285        public int getIndexInterval() { return indexInterval; }
286    
287        /** Sets the index interval.
288         * @see #getIndexInterval()
289         */
290        public void setIndexInterval(int interval) { indexInterval = interval; }
291    
292        /** Sets the index interval and stores it in conf
293         * @see #getIndexInterval()
294         */
295        public static void setIndexInterval(Configuration conf, int interval) {
296          conf.setInt(INDEX_INTERVAL, interval);
297        }
298    
299        /** Close the map. */
300        @Override
301        public synchronized void close() throws IOException {
302          data.close();
303          index.close();
304        }
305    
306        /** Append a key/value pair to the map.  The key must be greater or equal
307         * to the previous key added to the map. */
308        public synchronized void append(WritableComparable key, Writable val)
309          throws IOException {
310    
311          checkKey(key);
312    
313          long pos = data.getLength();      
314          // Only write an index if we've changed positions. In a block compressed
315          // file, this means we write an entry at the start of each block      
316          if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
317            position.set(pos);                        // point to current eof
318            index.append(key, position);
319            lastIndexPos = pos;
320            lastIndexKeyCount = size;
321          }
322    
323          data.append(key, val);                      // append key/value to data
324          size++;
325        }
326    
327        private void checkKey(WritableComparable key) throws IOException {
328          // check that keys are well-ordered
329          if (size != 0 && comparator.compare(lastKey, key) > 0)
330            throw new IOException("key out of order: "+key+" after "+lastKey);
331              
332          // update lastKey with a copy of key by writing and reading
333          outBuf.reset();
334          key.write(outBuf);                          // write new key
335    
336          inBuf.reset(outBuf.getData(), outBuf.getLength());
337          lastKey.readFields(inBuf);                  // read into lastKey
338        }
339    
340      }
341      
342      /** Provide access to an existing map. */
343      public static class Reader implements java.io.Closeable {
344          
345        /** Number of index entries to skip between each entry.  Zero by default.
346         * Setting this to values larger than zero can facilitate opening large map
347         * files using less memory. */
348        private int INDEX_SKIP = 0;
349          
350        private WritableComparator comparator;
351    
352        private WritableComparable nextKey;
353        private long seekPosition = -1;
354        private int seekIndex = -1;
355        private long firstPosition;
356    
357        // the data, on disk
358        private SequenceFile.Reader data;
359        private SequenceFile.Reader index;
360    
361        // whether the index Reader was closed
362        private boolean indexClosed = false;
363    
364        // the index, in memory
365        private int count = -1;
366        private WritableComparable[] keys;
367        private long[] positions;
368    
369        /** Returns the class of keys in this file. */
370        public Class<?> getKeyClass() { return data.getKeyClass(); }
371    
372        /** Returns the class of values in this file. */
373        public Class<?> getValueClass() { return data.getValueClass(); }
374    
375        public static interface Option extends SequenceFile.Reader.Option {}
376        
377        public static Option comparator(WritableComparator value) {
378          return new ComparatorOption(value);
379        }
380    
381        static class ComparatorOption implements Option {
382          private final WritableComparator value;
383          ComparatorOption(WritableComparator value) {
384            this.value = value;
385          }
386          WritableComparator getValue() {
387            return value;
388          }
389        }
390    
391        public Reader(Path dir, Configuration conf,
392                      SequenceFile.Reader.Option... opts) throws IOException {
393          ComparatorOption comparatorOption = 
394            Options.getOption(ComparatorOption.class, opts);
395          WritableComparator comparator =
396            comparatorOption == null ? null : comparatorOption.getValue();
397          INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
398          open(dir, comparator, conf, opts);
399        }
400     
401        /** Construct a map reader for the named map.
402         * @deprecated
403         */
404        @Deprecated
405        public Reader(FileSystem fs, String dirName, 
406                      Configuration conf) throws IOException {
407          this(new Path(dirName), conf);
408        }
409    
410        /** Construct a map reader for the named map using the named comparator.
411         * @deprecated
412         */
413        @Deprecated
414        public Reader(FileSystem fs, String dirName, WritableComparator comparator, 
415                      Configuration conf) throws IOException {
416          this(new Path(dirName), conf, comparator(comparator));
417        }
418        
419        protected synchronized void open(Path dir,
420                                         WritableComparator comparator,
421                                         Configuration conf, 
422                                         SequenceFile.Reader.Option... options
423                                         ) throws IOException {
424          Path dataFile = new Path(dir, DATA_FILE_NAME);
425          Path indexFile = new Path(dir, INDEX_FILE_NAME);
426    
427          // open the data
428          this.data = createDataFileReader(dataFile, conf, options);
429          this.firstPosition = data.getPosition();
430    
431          if (comparator == null) {
432            Class<? extends WritableComparable> cls;
433            cls = data.getKeyClass().asSubclass(WritableComparable.class);
434            this.comparator = WritableComparator.get(cls, conf);
435          } else {
436            this.comparator = comparator;
437          }
438    
439          // open the index
440          SequenceFile.Reader.Option[] indexOptions =
441            Options.prependOptions(options, SequenceFile.Reader.file(indexFile));
442          this.index = new SequenceFile.Reader(conf, indexOptions);
443        }
444    
445        /**
446         * Override this method to specialize the type of
447         * {@link SequenceFile.Reader} returned.
448         */
449        protected SequenceFile.Reader 
450          createDataFileReader(Path dataFile, Configuration conf,
451                               SequenceFile.Reader.Option... options
452                               ) throws IOException {
453          SequenceFile.Reader.Option[] newOptions =
454            Options.prependOptions(options, SequenceFile.Reader.file(dataFile));
455          return new SequenceFile.Reader(conf, newOptions);
456        }
457    
458        private void readIndex() throws IOException {
459          // read the index entirely into memory
460          if (this.keys != null)
461            return;
462          this.count = 0;
463          this.positions = new long[1024];
464    
465          try {
466            int skip = INDEX_SKIP;
467            LongWritable position = new LongWritable();
468            WritableComparable lastKey = null;
469            long lastIndex = -1;
470            ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024);
471            while (true) {
472              WritableComparable k = comparator.newKey();
473    
474              if (!index.next(k, position))
475                break;
476    
477              // check order to make sure comparator is compatible
478              if (lastKey != null && comparator.compare(lastKey, k) > 0)
479                throw new IOException("key out of order: "+k+" after "+lastKey);
480              lastKey = k;
481              if (skip > 0) {
482                skip--;
483                continue;                             // skip this entry
484              } else {
485                skip = INDEX_SKIP;                    // reset skip
486              }
487    
488              // don't read an index that is the same as the previous one. Block
489              // compressed map files used to do this (multiple entries would point
490              // at the same block)
491              if (position.get() == lastIndex)
492                continue;
493    
494              if (count == positions.length) {
495                positions = Arrays.copyOf(positions, positions.length * 2);
496              }
497    
498              keyBuilder.add(k);
499              positions[count] = position.get();
500              count++;
501            }
502    
503            this.keys = keyBuilder.toArray(new WritableComparable[count]);
504            positions = Arrays.copyOf(positions, count);
505          } catch (EOFException e) {
506            LOG.warn("Unexpected EOF reading " + index +
507                                  " at entry #" + count + ".  Ignoring.");
508          } finally {
509            indexClosed = true;
510            index.close();
511          }
512        }
513    
514        /** Re-positions the reader before its first key. */
515        public synchronized void reset() throws IOException {
516          data.seek(firstPosition);
517        }
518    
519        /** Get the key at approximately the middle of the file. Or null if the
520         *  file is empty. 
521         */
522        public synchronized WritableComparable midKey() throws IOException {
523    
524          readIndex();
525          if (count == 0) {
526            return null;
527          }
528        
529          return keys[(count - 1) / 2];
530        }
531        
532        /** Reads the final key from the file.
533         *
534         * @param key key to read into
535         */
536        public synchronized void finalKey(WritableComparable key)
537          throws IOException {
538    
539          long originalPosition = data.getPosition(); // save position
540          try {
541            readIndex();                              // make sure index is valid
542            if (count > 0) {
543              data.seek(positions[count-1]);          // skip to last indexed entry
544            } else {
545              reset();                                // start at the beginning
546            }
547            while (data.next(key)) {}                 // scan to eof
548    
549          } finally {
550            data.seek(originalPosition);              // restore position
551          }
552        }
553    
554        /** Positions the reader at the named key, or if none such exists, at the
555         * first entry after the named key.  Returns true iff the named key exists
556         * in this map.
557         */
558        public synchronized boolean seek(WritableComparable key) throws IOException {
559          return seekInternal(key) == 0;
560        }
561    
562        /** 
563         * Positions the reader at the named key, or if none such exists, at the
564         * first entry after the named key.
565         *
566         * @return  0   - exact match found
567         *          < 0 - positioned at next record
568         *          1   - no more records in file
569         */
570        private synchronized int seekInternal(WritableComparable key)
571          throws IOException {
572          return seekInternal(key, false);
573        }
574    
575        /** 
576         * Positions the reader at the named key, or if none such exists, at the
577         * key that falls just before or just after dependent on how the
578         * <code>before</code> parameter is set.
579         * 
580         * @param before - IF true, and <code>key</code> does not exist, position
581         * file at entry that falls just before <code>key</code>.  Otherwise,
582         * position file at record that sorts just after.
583         * @return  0   - exact match found
584         *          < 0 - positioned at next record
585         *          1   - no more records in file
586         */
587        private synchronized int seekInternal(WritableComparable key,
588            final boolean before)
589          throws IOException {
590          readIndex();                                // make sure index is read
591    
592          if (seekIndex != -1                         // seeked before
593              && seekIndex+1 < count           
594              && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
595              && comparator.compare(key, nextKey)
596              >= 0) {                                 // but after last seeked
597            // do nothing
598          } else {
599            seekIndex = binarySearch(key);
600            if (seekIndex < 0)                        // decode insertion point
601              seekIndex = -seekIndex-2;
602    
603            if (seekIndex == -1)                      // belongs before first entry
604              seekPosition = firstPosition;           // use beginning of file
605            else
606              seekPosition = positions[seekIndex];    // else use index
607          }
608          data.seek(seekPosition);
609          
610          if (nextKey == null)
611            nextKey = comparator.newKey();
612         
613          // If we're looking for the key before, we need to keep track
614          // of the position we got the current key as well as the position
615          // of the key before it.
616          long prevPosition = -1;
617          long curPosition = seekPosition;
618    
619          while (data.next(nextKey)) {
620            int c = comparator.compare(key, nextKey);
621            if (c <= 0) {                             // at or beyond desired
622              if (before && c != 0) {
623                if (prevPosition == -1) {
624                  // We're on the first record of this index block
625                  // and we've already passed the search key. Therefore
626                  // we must be at the beginning of the file, so seek
627                  // to the beginning of this block and return c
628                  data.seek(curPosition);
629                } else {
630                  // We have a previous record to back up to
631                  data.seek(prevPosition);
632                  data.next(nextKey);
633                  // now that we've rewound, the search key must be greater than this key
634                  return 1;
635                }
636              }
637              return c;
638            }
639            if (before) {
640              prevPosition = curPosition;
641              curPosition = data.getPosition();
642            }
643          }
644    
645          return 1;
646        }
647    
648        private int binarySearch(WritableComparable key) {
649          int low = 0;
650          int high = count-1;
651    
652          while (low <= high) {
653            int mid = (low + high) >>> 1;
654            WritableComparable midVal = keys[mid];
655            int cmp = comparator.compare(midVal, key);
656    
657            if (cmp < 0)
658              low = mid + 1;
659            else if (cmp > 0)
660              high = mid - 1;
661            else
662              return mid;                             // key found
663          }
664          return -(low + 1);                          // key not found.
665        }
666    
667        /** Read the next key/value pair in the map into <code>key</code> and
668         * <code>val</code>.  Returns true if such a pair exists and false when at
669         * the end of the map */
670        public synchronized boolean next(WritableComparable key, Writable val)
671          throws IOException {
672          return data.next(key, val);
673        }
674    
675        /** Return the value for the named key, or null if none exists. */
676        public synchronized Writable get(WritableComparable key, Writable val)
677          throws IOException {
678          if (seek(key)) {
679            data.getCurrentValue(val);
680            return val;
681          } else
682            return null;
683        }
684    
685        /** 
686         * Finds the record that is the closest match to the specified key.
687         * Returns <code>key</code> or if it does not exist, at the first entry
688         * after the named key.
689         * 
690    -     * @param key       - key that we're trying to find
691    -     * @param val       - data value if key is found
692    -     * @return          - the key that was the closest match or null if eof.
693         */
694        public synchronized WritableComparable getClosest(WritableComparable key,
695          Writable val)
696        throws IOException {
697          return getClosest(key, val, false);
698        }
699    
700        /** 
701         * Finds the record that is the closest match to the specified key.
702         * 
703         * @param key       - key that we're trying to find
704         * @param val       - data value if key is found
705         * @param before    - IF true, and <code>key</code> does not exist, return
706         * the first entry that falls just before the <code>key</code>.  Otherwise,
707         * return the record that sorts just after.
708         * @return          - the key that was the closest match or null if eof.
709         */
710        public synchronized WritableComparable getClosest(WritableComparable key,
711            Writable val, final boolean before)
712          throws IOException {
713         
714          int c = seekInternal(key, before);
715    
716          // If we didn't get an exact match, and we ended up in the wrong
717          // direction relative to the query key, return null since we
718          // must be at the beginning or end of the file.
719          if ((!before && c > 0) ||
720              (before && c < 0)) {
721            return null;
722          }
723    
724          data.getCurrentValue(val);
725          return nextKey;
726        }
727    
728        /** Close the map. */
729        @Override
730        public synchronized void close() throws IOException {
731          if (!indexClosed) {
732            index.close();
733          }
734          data.close();
735        }
736    
737      }
738    
739      /** Renames an existing map directory. */
740      public static void rename(FileSystem fs, String oldName, String newName)
741        throws IOException {
742        Path oldDir = new Path(oldName);
743        Path newDir = new Path(newName);
744        if (!fs.rename(oldDir, newDir)) {
745          throw new IOException("Could not rename " + oldDir + " to " + newDir);
746        }
747      }
748    
749      /** Deletes the named map file. */
750      public static void delete(FileSystem fs, String name) throws IOException {
751        Path dir = new Path(name);
752        Path data = new Path(dir, DATA_FILE_NAME);
753        Path index = new Path(dir, INDEX_FILE_NAME);
754    
755        fs.delete(data, true);
756        fs.delete(index, true);
757        fs.delete(dir, true);
758      }
759    
760      /**
761       * This method attempts to fix a corrupt MapFile by re-creating its index.
762       * @param fs filesystem
763       * @param dir directory containing the MapFile data and index
764       * @param keyClass key class (has to be a subclass of Writable)
765       * @param valueClass value class (has to be a subclass of Writable)
766       * @param dryrun do not perform any changes, just report what needs to be done
767       * @return number of valid entries in this MapFile, or -1 if no fixing was needed
768       * @throws Exception
769       */
770      public static long fix(FileSystem fs, Path dir,
771                             Class<? extends Writable> keyClass,
772                             Class<? extends Writable> valueClass, boolean dryrun,
773                             Configuration conf) throws Exception {
774        String dr = (dryrun ? "[DRY RUN ] " : "");
775        Path data = new Path(dir, DATA_FILE_NAME);
776        Path index = new Path(dir, INDEX_FILE_NAME);
777        int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128);
778        if (!fs.exists(data)) {
779          // there's nothing we can do to fix this!
780          throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
781        }
782        if (fs.exists(index)) {
783          // no fixing needed
784          return -1;
785        }
786        SequenceFile.Reader dataReader = 
787          new SequenceFile.Reader(conf, SequenceFile.Reader.file(data));
788        if (!dataReader.getKeyClass().equals(keyClass)) {
789          throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
790                              ", got " + dataReader.getKeyClass().getName());
791        }
792        if (!dataReader.getValueClass().equals(valueClass)) {
793          throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
794                              ", got " + dataReader.getValueClass().getName());
795        }
796        long cnt = 0L;
797        Writable key = ReflectionUtils.newInstance(keyClass, conf);
798        Writable value = ReflectionUtils.newInstance(valueClass, conf);
799        SequenceFile.Writer indexWriter = null;
800        if (!dryrun) {
801          indexWriter = 
802            SequenceFile.createWriter(conf, 
803                                      SequenceFile.Writer.file(index), 
804                                      SequenceFile.Writer.keyClass(keyClass), 
805                                      SequenceFile.Writer.valueClass
806                                        (LongWritable.class));
807        }
808        try {
809          long pos = 0L;
810          LongWritable position = new LongWritable();
811          while(dataReader.next(key, value)) {
812            cnt++;
813            if (cnt % indexInterval == 0) {
814              position.set(pos);
815              if (!dryrun) indexWriter.append(key, position);
816            }
817            pos = dataReader.getPosition();
818          }
819        } catch(Throwable t) {
820          // truncated data file. swallow it.
821        }
822        dataReader.close();
823        if (!dryrun) indexWriter.close();
824        return cnt;
825      }
826    
827    
828      public static void main(String[] args) throws Exception {
829        String usage = "Usage: MapFile inFile outFile";
830          
831        if (args.length != 2) {
832          System.err.println(usage);
833          System.exit(-1);
834        }
835          
836        String in = args[0];
837        String out = args[1];
838    
839        Configuration conf = new Configuration();
840        FileSystem fs = FileSystem.getLocal(conf);
841        MapFile.Reader reader = null;
842        MapFile.Writer writer = null;
843        try {
844          reader = new MapFile.Reader(fs, in, conf);
845          writer =
846            new MapFile.Writer(conf, fs, out,
847                reader.getKeyClass().asSubclass(WritableComparable.class),
848                reader.getValueClass());
849    
850          WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass()
851            .asSubclass(WritableComparable.class), conf);
852          Writable value = ReflectionUtils.newInstance(reader.getValueClass()
853            .asSubclass(Writable.class), conf);
854    
855          while (reader.next(key, value))               // copy all entries
856            writer.append(key, value);
857        } finally {
858          IOUtils.cleanup(LOG, writer, reader);
859        }
860      }
861    }