001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.io.SequenceFile.CompressionType; 034import org.apache.hadoop.io.compress.CompressionCodec; 035import org.apache.hadoop.util.Options; 036import org.apache.hadoop.util.Progressable; 037import org.apache.hadoop.util.ReflectionUtils; 038 039/** A file-based map from keys to values. 040 * 041 * <p>A map is a directory containing two files, the <code>data</code> file, 042 * containing all keys and values in the map, and a smaller <code>index</code> 043 * file, containing a fraction of the keys. The fraction is determined by 044 * {@link Writer#getIndexInterval()}. 045 * 046 * <p>The index file is read entirely into memory. Thus key implementations 047 * should try to keep themselves small. 048 * 049 * <p>Map files are created by adding entries in-order. To maintain a large 050 * database, perform updates by copying the previous version of a database and 051 * merging in a sorted change list, to create a new version of the database in 052 * a new file. Sorting large change lists can be done with {@link 053 * SequenceFile.Sorter}. 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Stable 057public class MapFile { 058 private static final Log LOG = LogFactory.getLog(MapFile.class); 059 060 /** The name of the index file. */ 061 public static final String INDEX_FILE_NAME = "index"; 062 063 /** The name of the data file. */ 064 public static final String DATA_FILE_NAME = "data"; 065 066 protected MapFile() {} // no public ctor 067 068 /** Writes a new map. */ 069 public static class Writer implements java.io.Closeable { 070 private SequenceFile.Writer data; 071 private SequenceFile.Writer index; 072 073 final private static String INDEX_INTERVAL = "io.map.index.interval"; 074 private int indexInterval = 128; 075 076 private long size; 077 private LongWritable position = new LongWritable(); 078 079 // the following fields are used only for checking key order 080 private WritableComparator comparator; 081 private DataInputBuffer inBuf = new DataInputBuffer(); 082 private DataOutputBuffer outBuf = new DataOutputBuffer(); 083 private WritableComparable lastKey; 084 085 /** What's the position (in bytes) we wrote when we got the last index */ 086 private long lastIndexPos = -1; 087 088 /** 089 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that 090 * we have an index at position zero -- midKey will throw an exception if this 091 * is not the case 092 */ 093 private long lastIndexKeyCount = Long.MIN_VALUE; 094 095 096 /** Create the named map for keys of the named class. 097 * @deprecated Use Writer(Configuration, Path, Option...) instead. 098 */ 099 @Deprecated 100 public Writer(Configuration conf, FileSystem fs, String dirName, 101 Class<? extends WritableComparable> keyClass, 102 Class valClass) throws IOException { 103 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 104 } 105 106 /** Create the named map for keys of the named class. 107 * @deprecated Use Writer(Configuration, Path, Option...) instead. 108 */ 109 @Deprecated 110 public Writer(Configuration conf, FileSystem fs, String dirName, 111 Class<? extends WritableComparable> keyClass, Class valClass, 112 CompressionType compress, 113 Progressable progress) throws IOException { 114 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 115 compression(compress), progressable(progress)); 116 } 117 118 /** Create the named map for keys of the named class. 119 * @deprecated Use Writer(Configuration, Path, Option...) instead. 120 */ 121 @Deprecated 122 public Writer(Configuration conf, FileSystem fs, String dirName, 123 Class<? extends WritableComparable> keyClass, Class valClass, 124 CompressionType compress, CompressionCodec codec, 125 Progressable progress) throws IOException { 126 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 127 compression(compress, codec), progressable(progress)); 128 } 129 130 /** Create the named map for keys of the named class. 131 * @deprecated Use Writer(Configuration, Path, Option...) instead. 132 */ 133 @Deprecated 134 public Writer(Configuration conf, FileSystem fs, String dirName, 135 Class<? extends WritableComparable> keyClass, Class valClass, 136 CompressionType compress) throws IOException { 137 this(conf, new Path(dirName), keyClass(keyClass), 138 valueClass(valClass), compression(compress)); 139 } 140 141 /** Create the named map using the named key comparator. 142 * @deprecated Use Writer(Configuration, Path, Option...) instead. 143 */ 144 @Deprecated 145 public Writer(Configuration conf, FileSystem fs, String dirName, 146 WritableComparator comparator, Class valClass 147 ) throws IOException { 148 this(conf, new Path(dirName), comparator(comparator), 149 valueClass(valClass)); 150 } 151 152 /** Create the named map using the named key comparator. 153 * @deprecated Use Writer(Configuration, Path, Option...) instead. 154 */ 155 @Deprecated 156 public Writer(Configuration conf, FileSystem fs, String dirName, 157 WritableComparator comparator, Class valClass, 158 SequenceFile.CompressionType compress) throws IOException { 159 this(conf, new Path(dirName), comparator(comparator), 160 valueClass(valClass), compression(compress)); 161 } 162 163 /** Create the named map using the named key comparator. 164 * @deprecated Use Writer(Configuration, Path, Option...)} instead. 165 */ 166 @Deprecated 167 public Writer(Configuration conf, FileSystem fs, String dirName, 168 WritableComparator comparator, Class valClass, 169 SequenceFile.CompressionType compress, 170 Progressable progress) throws IOException { 171 this(conf, new Path(dirName), comparator(comparator), 172 valueClass(valClass), compression(compress), 173 progressable(progress)); 174 } 175 176 /** Create the named map using the named key comparator. 177 * @deprecated Use Writer(Configuration, Path, Option...) instead. 178 */ 179 @Deprecated 180 public Writer(Configuration conf, FileSystem fs, String dirName, 181 WritableComparator comparator, Class valClass, 182 SequenceFile.CompressionType compress, CompressionCodec codec, 183 Progressable progress) throws IOException { 184 this(conf, new Path(dirName), comparator(comparator), 185 valueClass(valClass), compression(compress, codec), 186 progressable(progress)); 187 } 188 189 // our options are a superset of sequence file writer options 190 public static interface Option extends SequenceFile.Writer.Option { } 191 192 private static class KeyClassOption extends Options.ClassOption 193 implements Option { 194 KeyClassOption(Class<?> value) { 195 super(value); 196 } 197 } 198 199 private static class ComparatorOption implements Option { 200 private final WritableComparator value; 201 ComparatorOption(WritableComparator value) { 202 this.value = value; 203 } 204 WritableComparator getValue() { 205 return value; 206 } 207 } 208 209 public static Option keyClass(Class<? extends WritableComparable> value) { 210 return new KeyClassOption(value); 211 } 212 213 public static Option comparator(WritableComparator value) { 214 return new ComparatorOption(value); 215 } 216 217 public static SequenceFile.Writer.Option valueClass(Class<?> value) { 218 return SequenceFile.Writer.valueClass(value); 219 } 220 221 public static 222 SequenceFile.Writer.Option compression(CompressionType type) { 223 return SequenceFile.Writer.compression(type); 224 } 225 226 public static 227 SequenceFile.Writer.Option compression(CompressionType type, 228 CompressionCodec codec) { 229 return SequenceFile.Writer.compression(type, codec); 230 } 231 232 public static SequenceFile.Writer.Option progressable(Progressable value) { 233 return SequenceFile.Writer.progressable(value); 234 } 235 236 @SuppressWarnings("unchecked") 237 public Writer(Configuration conf, 238 Path dirName, 239 SequenceFile.Writer.Option... opts 240 ) throws IOException { 241 KeyClassOption keyClassOption = 242 Options.getOption(KeyClassOption.class, opts); 243 ComparatorOption comparatorOption = 244 Options.getOption(ComparatorOption.class, opts); 245 if ((keyClassOption == null) == (comparatorOption == null)) { 246 throw new IllegalArgumentException("key class or comparator option " 247 + "must be set"); 248 } 249 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); 250 251 Class<? extends WritableComparable> keyClass; 252 if (keyClassOption == null) { 253 this.comparator = comparatorOption.getValue(); 254 keyClass = comparator.getKeyClass(); 255 } else { 256 keyClass= 257 (Class<? extends WritableComparable>) keyClassOption.getValue(); 258 this.comparator = WritableComparator.get(keyClass); 259 } 260 this.lastKey = comparator.newKey(); 261 FileSystem fs = dirName.getFileSystem(conf); 262 263 if (!fs.mkdirs(dirName)) { 264 throw new IOException("Mkdirs failed to create directory " + dirName); 265 } 266 Path dataFile = new Path(dirName, DATA_FILE_NAME); 267 Path indexFile = new Path(dirName, INDEX_FILE_NAME); 268 269 SequenceFile.Writer.Option[] dataOptions = 270 Options.prependOptions(opts, 271 SequenceFile.Writer.file(dataFile), 272 SequenceFile.Writer.keyClass(keyClass)); 273 this.data = SequenceFile.createWriter(conf, dataOptions); 274 275 SequenceFile.Writer.Option[] indexOptions = 276 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile), 277 SequenceFile.Writer.keyClass(keyClass), 278 SequenceFile.Writer.valueClass(LongWritable.class), 279 SequenceFile.Writer.compression(CompressionType.BLOCK)); 280 this.index = SequenceFile.createWriter(conf, indexOptions); 281 } 282 283 /** The number of entries that are added before an index entry is added.*/ 284 public int getIndexInterval() { return indexInterval; } 285 286 /** Sets the index interval. 287 * @see #getIndexInterval() 288 */ 289 public void setIndexInterval(int interval) { indexInterval = interval; } 290 291 /** Sets the index interval and stores it in conf 292 * @see #getIndexInterval() 293 */ 294 public static void setIndexInterval(Configuration conf, int interval) { 295 conf.setInt(INDEX_INTERVAL, interval); 296 } 297 298 /** Close the map. */ 299 public synchronized void close() throws IOException { 300 data.close(); 301 index.close(); 302 } 303 304 /** Append a key/value pair to the map. The key must be greater or equal 305 * to the previous key added to the map. */ 306 public synchronized void append(WritableComparable key, Writable val) 307 throws IOException { 308 309 checkKey(key); 310 311 long pos = data.getLength(); 312 // Only write an index if we've changed positions. In a block compressed 313 // file, this means we write an entry at the start of each block 314 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { 315 position.set(pos); // point to current eof 316 index.append(key, position); 317 lastIndexPos = pos; 318 lastIndexKeyCount = size; 319 } 320 321 data.append(key, val); // append key/value to data 322 size++; 323 } 324 325 private void checkKey(WritableComparable key) throws IOException { 326 // check that keys are well-ordered 327 if (size != 0 && comparator.compare(lastKey, key) > 0) 328 throw new IOException("key out of order: "+key+" after "+lastKey); 329 330 // update lastKey with a copy of key by writing and reading 331 outBuf.reset(); 332 key.write(outBuf); // write new key 333 334 inBuf.reset(outBuf.getData(), outBuf.getLength()); 335 lastKey.readFields(inBuf); // read into lastKey 336 } 337 338 } 339 340 /** Provide access to an existing map. */ 341 public static class Reader implements java.io.Closeable { 342 343 /** Number of index entries to skip between each entry. Zero by default. 344 * Setting this to values larger than zero can facilitate opening large map 345 * files using less memory. */ 346 private int INDEX_SKIP = 0; 347 348 private WritableComparator comparator; 349 350 private WritableComparable nextKey; 351 private long seekPosition = -1; 352 private int seekIndex = -1; 353 private long firstPosition; 354 355 // the data, on disk 356 private SequenceFile.Reader data; 357 private SequenceFile.Reader index; 358 359 // whether the index Reader was closed 360 private boolean indexClosed = false; 361 362 // the index, in memory 363 private int count = -1; 364 private WritableComparable[] keys; 365 private long[] positions; 366 367 /** Returns the class of keys in this file. */ 368 public Class<?> getKeyClass() { return data.getKeyClass(); } 369 370 /** Returns the class of values in this file. */ 371 public Class<?> getValueClass() { return data.getValueClass(); } 372 373 public static interface Option extends SequenceFile.Reader.Option {} 374 375 public static Option comparator(WritableComparator value) { 376 return new ComparatorOption(value); 377 } 378 379 static class ComparatorOption implements Option { 380 private final WritableComparator value; 381 ComparatorOption(WritableComparator value) { 382 this.value = value; 383 } 384 WritableComparator getValue() { 385 return value; 386 } 387 } 388 389 public Reader(Path dir, Configuration conf, 390 SequenceFile.Reader.Option... opts) throws IOException { 391 ComparatorOption comparatorOption = 392 Options.getOption(ComparatorOption.class, opts); 393 WritableComparator comparator = 394 comparatorOption == null ? null : comparatorOption.getValue(); 395 INDEX_SKIP = conf.getInt("io.map.index.skip", 0); 396 open(dir, comparator, conf, opts); 397 } 398 399 /** Construct a map reader for the named map. 400 * @deprecated 401 */ 402 @Deprecated 403 public Reader(FileSystem fs, String dirName, 404 Configuration conf) throws IOException { 405 this(new Path(dirName), conf); 406 } 407 408 /** Construct a map reader for the named map using the named comparator. 409 * @deprecated 410 */ 411 @Deprecated 412 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 413 Configuration conf) throws IOException { 414 this(new Path(dirName), conf, comparator(comparator)); 415 } 416 417 protected synchronized void open(Path dir, 418 WritableComparator comparator, 419 Configuration conf, 420 SequenceFile.Reader.Option... options 421 ) throws IOException { 422 Path dataFile = new Path(dir, DATA_FILE_NAME); 423 Path indexFile = new Path(dir, INDEX_FILE_NAME); 424 425 // open the data 426 this.data = createDataFileReader(dataFile, conf, options); 427 this.firstPosition = data.getPosition(); 428 429 if (comparator == null) 430 this.comparator = 431 WritableComparator.get(data.getKeyClass(). 432 asSubclass(WritableComparable.class)); 433 else 434 this.comparator = comparator; 435 436 // open the index 437 SequenceFile.Reader.Option[] indexOptions = 438 Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); 439 this.index = new SequenceFile.Reader(conf, indexOptions); 440 } 441 442 /** 443 * Override this method to specialize the type of 444 * {@link SequenceFile.Reader} returned. 445 */ 446 protected SequenceFile.Reader 447 createDataFileReader(Path dataFile, Configuration conf, 448 SequenceFile.Reader.Option... options 449 ) throws IOException { 450 SequenceFile.Reader.Option[] newOptions = 451 Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); 452 return new SequenceFile.Reader(conf, newOptions); 453 } 454 455 private void readIndex() throws IOException { 456 // read the index entirely into memory 457 if (this.keys != null) 458 return; 459 this.count = 0; 460 this.positions = new long[1024]; 461 462 try { 463 int skip = INDEX_SKIP; 464 LongWritable position = new LongWritable(); 465 WritableComparable lastKey = null; 466 long lastIndex = -1; 467 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024); 468 while (true) { 469 WritableComparable k = comparator.newKey(); 470 471 if (!index.next(k, position)) 472 break; 473 474 // check order to make sure comparator is compatible 475 if (lastKey != null && comparator.compare(lastKey, k) > 0) 476 throw new IOException("key out of order: "+k+" after "+lastKey); 477 lastKey = k; 478 if (skip > 0) { 479 skip--; 480 continue; // skip this entry 481 } else { 482 skip = INDEX_SKIP; // reset skip 483 } 484 485 // don't read an index that is the same as the previous one. Block 486 // compressed map files used to do this (multiple entries would point 487 // at the same block) 488 if (position.get() == lastIndex) 489 continue; 490 491 if (count == positions.length) { 492 positions = Arrays.copyOf(positions, positions.length * 2); 493 } 494 495 keyBuilder.add(k); 496 positions[count] = position.get(); 497 count++; 498 } 499 500 this.keys = keyBuilder.toArray(new WritableComparable[count]); 501 positions = Arrays.copyOf(positions, count); 502 } catch (EOFException e) { 503 LOG.warn("Unexpected EOF reading " + index + 504 " at entry #" + count + ". Ignoring."); 505 } finally { 506 indexClosed = true; 507 index.close(); 508 } 509 } 510 511 /** Re-positions the reader before its first key. */ 512 public synchronized void reset() throws IOException { 513 data.seek(firstPosition); 514 } 515 516 /** Get the key at approximately the middle of the file. Or null if the 517 * file is empty. 518 */ 519 public synchronized WritableComparable midKey() throws IOException { 520 521 readIndex(); 522 if (count == 0) { 523 return null; 524 } 525 526 return keys[(count - 1) / 2]; 527 } 528 529 /** Reads the final key from the file. 530 * 531 * @param key key to read into 532 */ 533 public synchronized void finalKey(WritableComparable key) 534 throws IOException { 535 536 long originalPosition = data.getPosition(); // save position 537 try { 538 readIndex(); // make sure index is valid 539 if (count > 0) { 540 data.seek(positions[count-1]); // skip to last indexed entry 541 } else { 542 reset(); // start at the beginning 543 } 544 while (data.next(key)) {} // scan to eof 545 546 } finally { 547 data.seek(originalPosition); // restore position 548 } 549 } 550 551 /** Positions the reader at the named key, or if none such exists, at the 552 * first entry after the named key. Returns true iff the named key exists 553 * in this map. 554 */ 555 public synchronized boolean seek(WritableComparable key) throws IOException { 556 return seekInternal(key) == 0; 557 } 558 559 /** 560 * Positions the reader at the named key, or if none such exists, at the 561 * first entry after the named key. 562 * 563 * @return 0 - exact match found 564 * < 0 - positioned at next record 565 * 1 - no more records in file 566 */ 567 private synchronized int seekInternal(WritableComparable key) 568 throws IOException { 569 return seekInternal(key, false); 570 } 571 572 /** 573 * Positions the reader at the named key, or if none such exists, at the 574 * key that falls just before or just after dependent on how the 575 * <code>before</code> parameter is set. 576 * 577 * @param before - IF true, and <code>key</code> does not exist, position 578 * file at entry that falls just before <code>key</code>. Otherwise, 579 * position file at record that sorts just after. 580 * @return 0 - exact match found 581 * < 0 - positioned at next record 582 * 1 - no more records in file 583 */ 584 private synchronized int seekInternal(WritableComparable key, 585 final boolean before) 586 throws IOException { 587 readIndex(); // make sure index is read 588 589 if (seekIndex != -1 // seeked before 590 && seekIndex+1 < count 591 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed 592 && comparator.compare(key, nextKey) 593 >= 0) { // but after last seeked 594 // do nothing 595 } else { 596 seekIndex = binarySearch(key); 597 if (seekIndex < 0) // decode insertion point 598 seekIndex = -seekIndex-2; 599 600 if (seekIndex == -1) // belongs before first entry 601 seekPosition = firstPosition; // use beginning of file 602 else 603 seekPosition = positions[seekIndex]; // else use index 604 } 605 data.seek(seekPosition); 606 607 if (nextKey == null) 608 nextKey = comparator.newKey(); 609 610 // If we're looking for the key before, we need to keep track 611 // of the position we got the current key as well as the position 612 // of the key before it. 613 long prevPosition = -1; 614 long curPosition = seekPosition; 615 616 while (data.next(nextKey)) { 617 int c = comparator.compare(key, nextKey); 618 if (c <= 0) { // at or beyond desired 619 if (before && c != 0) { 620 if (prevPosition == -1) { 621 // We're on the first record of this index block 622 // and we've already passed the search key. Therefore 623 // we must be at the beginning of the file, so seek 624 // to the beginning of this block and return c 625 data.seek(curPosition); 626 } else { 627 // We have a previous record to back up to 628 data.seek(prevPosition); 629 data.next(nextKey); 630 // now that we've rewound, the search key must be greater than this key 631 return 1; 632 } 633 } 634 return c; 635 } 636 if (before) { 637 prevPosition = curPosition; 638 curPosition = data.getPosition(); 639 } 640 } 641 642 return 1; 643 } 644 645 private int binarySearch(WritableComparable key) { 646 int low = 0; 647 int high = count-1; 648 649 while (low <= high) { 650 int mid = (low + high) >>> 1; 651 WritableComparable midVal = keys[mid]; 652 int cmp = comparator.compare(midVal, key); 653 654 if (cmp < 0) 655 low = mid + 1; 656 else if (cmp > 0) 657 high = mid - 1; 658 else 659 return mid; // key found 660 } 661 return -(low + 1); // key not found. 662 } 663 664 /** Read the next key/value pair in the map into <code>key</code> and 665 * <code>val</code>. Returns true if such a pair exists and false when at 666 * the end of the map */ 667 public synchronized boolean next(WritableComparable key, Writable val) 668 throws IOException { 669 return data.next(key, val); 670 } 671 672 /** Return the value for the named key, or null if none exists. */ 673 public synchronized Writable get(WritableComparable key, Writable val) 674 throws IOException { 675 if (seek(key)) { 676 data.getCurrentValue(val); 677 return val; 678 } else 679 return null; 680 } 681 682 /** 683 * Finds the record that is the closest match to the specified key. 684 * Returns <code>key</code> or if it does not exist, at the first entry 685 * after the named key. 686 * 687- * @param key - key that we're trying to find 688- * @param val - data value if key is found 689- * @return - the key that was the closest match or null if eof. 690 */ 691 public synchronized WritableComparable getClosest(WritableComparable key, 692 Writable val) 693 throws IOException { 694 return getClosest(key, val, false); 695 } 696 697 /** 698 * Finds the record that is the closest match to the specified key. 699 * 700 * @param key - key that we're trying to find 701 * @param val - data value if key is found 702 * @param before - IF true, and <code>key</code> does not exist, return 703 * the first entry that falls just before the <code>key</code>. Otherwise, 704 * return the record that sorts just after. 705 * @return - the key that was the closest match or null if eof. 706 */ 707 public synchronized WritableComparable getClosest(WritableComparable key, 708 Writable val, final boolean before) 709 throws IOException { 710 711 int c = seekInternal(key, before); 712 713 // If we didn't get an exact match, and we ended up in the wrong 714 // direction relative to the query key, return null since we 715 // must be at the beginning or end of the file. 716 if ((!before && c > 0) || 717 (before && c < 0)) { 718 return null; 719 } 720 721 data.getCurrentValue(val); 722 return nextKey; 723 } 724 725 /** Close the map. */ 726 public synchronized void close() throws IOException { 727 if (!indexClosed) { 728 index.close(); 729 } 730 data.close(); 731 } 732 733 } 734 735 /** Renames an existing map directory. */ 736 public static void rename(FileSystem fs, String oldName, String newName) 737 throws IOException { 738 Path oldDir = new Path(oldName); 739 Path newDir = new Path(newName); 740 if (!fs.rename(oldDir, newDir)) { 741 throw new IOException("Could not rename " + oldDir + " to " + newDir); 742 } 743 } 744 745 /** Deletes the named map file. */ 746 public static void delete(FileSystem fs, String name) throws IOException { 747 Path dir = new Path(name); 748 Path data = new Path(dir, DATA_FILE_NAME); 749 Path index = new Path(dir, INDEX_FILE_NAME); 750 751 fs.delete(data, true); 752 fs.delete(index, true); 753 fs.delete(dir, true); 754 } 755 756 /** 757 * This method attempts to fix a corrupt MapFile by re-creating its index. 758 * @param fs filesystem 759 * @param dir directory containing the MapFile data and index 760 * @param keyClass key class (has to be a subclass of Writable) 761 * @param valueClass value class (has to be a subclass of Writable) 762 * @param dryrun do not perform any changes, just report what needs to be done 763 * @return number of valid entries in this MapFile, or -1 if no fixing was needed 764 * @throws Exception 765 */ 766 public static long fix(FileSystem fs, Path dir, 767 Class<? extends Writable> keyClass, 768 Class<? extends Writable> valueClass, boolean dryrun, 769 Configuration conf) throws Exception { 770 String dr = (dryrun ? "[DRY RUN ] " : ""); 771 Path data = new Path(dir, DATA_FILE_NAME); 772 Path index = new Path(dir, INDEX_FILE_NAME); 773 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128); 774 if (!fs.exists(data)) { 775 // there's nothing we can do to fix this! 776 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this."); 777 } 778 if (fs.exists(index)) { 779 // no fixing needed 780 return -1; 781 } 782 SequenceFile.Reader dataReader = 783 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data)); 784 if (!dataReader.getKeyClass().equals(keyClass)) { 785 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + 786 ", got " + dataReader.getKeyClass().getName()); 787 } 788 if (!dataReader.getValueClass().equals(valueClass)) { 789 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + 790 ", got " + dataReader.getValueClass().getName()); 791 } 792 long cnt = 0L; 793 Writable key = ReflectionUtils.newInstance(keyClass, conf); 794 Writable value = ReflectionUtils.newInstance(valueClass, conf); 795 SequenceFile.Writer indexWriter = null; 796 if (!dryrun) { 797 indexWriter = 798 SequenceFile.createWriter(conf, 799 SequenceFile.Writer.file(index), 800 SequenceFile.Writer.keyClass(keyClass), 801 SequenceFile.Writer.valueClass 802 (LongWritable.class)); 803 } 804 try { 805 long pos = 0L; 806 LongWritable position = new LongWritable(); 807 while(dataReader.next(key, value)) { 808 cnt++; 809 if (cnt % indexInterval == 0) { 810 position.set(pos); 811 if (!dryrun) indexWriter.append(key, position); 812 } 813 pos = dataReader.getPosition(); 814 } 815 } catch(Throwable t) { 816 // truncated data file. swallow it. 817 } 818 dataReader.close(); 819 if (!dryrun) indexWriter.close(); 820 return cnt; 821 } 822 823 824 public static void main(String[] args) throws Exception { 825 String usage = "Usage: MapFile inFile outFile"; 826 827 if (args.length != 2) { 828 System.err.println(usage); 829 System.exit(-1); 830 } 831 832 String in = args[0]; 833 String out = args[1]; 834 835 Configuration conf = new Configuration(); 836 FileSystem fs = FileSystem.getLocal(conf); 837 MapFile.Reader reader = new MapFile.Reader(fs, in, conf); 838 MapFile.Writer writer = 839 new MapFile.Writer(conf, fs, out, 840 reader.getKeyClass().asSubclass(WritableComparable.class), 841 reader.getValueClass()); 842 843 WritableComparable key = 844 ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf); 845 Writable value = 846 ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf); 847 848 while (reader.next(key, value)) // copy all entries 849 writer.append(key, value); 850 851 writer.close(); 852 } 853 854}