001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.EOFException; 022 import java.io.IOException; 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.conf.Configuration; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.Path; 033 import org.apache.hadoop.io.IOUtils; 034 import org.apache.hadoop.io.SequenceFile.CompressionType; 035 import org.apache.hadoop.io.compress.CompressionCodec; 036 import org.apache.hadoop.util.Options; 037 import org.apache.hadoop.util.Progressable; 038 import org.apache.hadoop.util.ReflectionUtils; 039 040 /** A file-based map from keys to values. 041 * 042 * <p>A map is a directory containing two files, the <code>data</code> file, 043 * containing all keys and values in the map, and a smaller <code>index</code> 044 * file, containing a fraction of the keys. The fraction is determined by 045 * {@link Writer#getIndexInterval()}. 046 * 047 * <p>The index file is read entirely into memory. Thus key implementations 048 * should try to keep themselves small. 049 * 050 * <p>Map files are created by adding entries in-order. To maintain a large 051 * database, perform updates by copying the previous version of a database and 052 * merging in a sorted change list, to create a new version of the database in 053 * a new file. Sorting large change lists can be done with {@link 054 * SequenceFile.Sorter}. 055 */ 056 @InterfaceAudience.Public 057 @InterfaceStability.Stable 058 public class MapFile { 059 private static final Log LOG = LogFactory.getLog(MapFile.class); 060 061 /** The name of the index file. */ 062 public static final String INDEX_FILE_NAME = "index"; 063 064 /** The name of the data file. */ 065 public static final String DATA_FILE_NAME = "data"; 066 067 protected MapFile() {} // no public ctor 068 069 /** Writes a new map. */ 070 public static class Writer implements java.io.Closeable { 071 private SequenceFile.Writer data; 072 private SequenceFile.Writer index; 073 074 final private static String INDEX_INTERVAL = "io.map.index.interval"; 075 private int indexInterval = 128; 076 077 private long size; 078 private LongWritable position = new LongWritable(); 079 080 // the following fields are used only for checking key order 081 private WritableComparator comparator; 082 private DataInputBuffer inBuf = new DataInputBuffer(); 083 private DataOutputBuffer outBuf = new DataOutputBuffer(); 084 private WritableComparable lastKey; 085 086 /** What's the position (in bytes) we wrote when we got the last index */ 087 private long lastIndexPos = -1; 088 089 /** 090 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that 091 * we have an index at position zero -- midKey will throw an exception if this 092 * is not the case 093 */ 094 private long lastIndexKeyCount = Long.MIN_VALUE; 095 096 097 /** Create the named map for keys of the named class. 098 * @deprecated Use Writer(Configuration, Path, Option...) instead. 099 */ 100 @Deprecated 101 public Writer(Configuration conf, FileSystem fs, String dirName, 102 Class<? extends WritableComparable> keyClass, 103 Class valClass) throws IOException { 104 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 105 } 106 107 /** Create the named map for keys of the named class. 108 * @deprecated Use Writer(Configuration, Path, Option...) instead. 109 */ 110 @Deprecated 111 public Writer(Configuration conf, FileSystem fs, String dirName, 112 Class<? extends WritableComparable> keyClass, Class valClass, 113 CompressionType compress, 114 Progressable progress) throws IOException { 115 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 116 compression(compress), progressable(progress)); 117 } 118 119 /** Create the named map for keys of the named class. 120 * @deprecated Use Writer(Configuration, Path, Option...) instead. 121 */ 122 @Deprecated 123 public Writer(Configuration conf, FileSystem fs, String dirName, 124 Class<? extends WritableComparable> keyClass, Class valClass, 125 CompressionType compress, CompressionCodec codec, 126 Progressable progress) throws IOException { 127 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 128 compression(compress, codec), progressable(progress)); 129 } 130 131 /** Create the named map for keys of the named class. 132 * @deprecated Use Writer(Configuration, Path, Option...) instead. 133 */ 134 @Deprecated 135 public Writer(Configuration conf, FileSystem fs, String dirName, 136 Class<? extends WritableComparable> keyClass, Class valClass, 137 CompressionType compress) throws IOException { 138 this(conf, new Path(dirName), keyClass(keyClass), 139 valueClass(valClass), compression(compress)); 140 } 141 142 /** Create the named map using the named key comparator. 143 * @deprecated Use Writer(Configuration, Path, Option...) instead. 144 */ 145 @Deprecated 146 public Writer(Configuration conf, FileSystem fs, String dirName, 147 WritableComparator comparator, Class valClass 148 ) throws IOException { 149 this(conf, new Path(dirName), comparator(comparator), 150 valueClass(valClass)); 151 } 152 153 /** Create the named map using the named key comparator. 154 * @deprecated Use Writer(Configuration, Path, Option...) instead. 155 */ 156 @Deprecated 157 public Writer(Configuration conf, FileSystem fs, String dirName, 158 WritableComparator comparator, Class valClass, 159 SequenceFile.CompressionType compress) throws IOException { 160 this(conf, new Path(dirName), comparator(comparator), 161 valueClass(valClass), compression(compress)); 162 } 163 164 /** Create the named map using the named key comparator. 165 * @deprecated Use Writer(Configuration, Path, Option...)} instead. 166 */ 167 @Deprecated 168 public Writer(Configuration conf, FileSystem fs, String dirName, 169 WritableComparator comparator, Class valClass, 170 SequenceFile.CompressionType compress, 171 Progressable progress) throws IOException { 172 this(conf, new Path(dirName), comparator(comparator), 173 valueClass(valClass), compression(compress), 174 progressable(progress)); 175 } 176 177 /** Create the named map using the named key comparator. 178 * @deprecated Use Writer(Configuration, Path, Option...) instead. 179 */ 180 @Deprecated 181 public Writer(Configuration conf, FileSystem fs, String dirName, 182 WritableComparator comparator, Class valClass, 183 SequenceFile.CompressionType compress, CompressionCodec codec, 184 Progressable progress) throws IOException { 185 this(conf, new Path(dirName), comparator(comparator), 186 valueClass(valClass), compression(compress, codec), 187 progressable(progress)); 188 } 189 190 // our options are a superset of sequence file writer options 191 public static interface Option extends SequenceFile.Writer.Option { } 192 193 private static class KeyClassOption extends Options.ClassOption 194 implements Option { 195 KeyClassOption(Class<?> value) { 196 super(value); 197 } 198 } 199 200 private static class ComparatorOption implements Option { 201 private final WritableComparator value; 202 ComparatorOption(WritableComparator value) { 203 this.value = value; 204 } 205 WritableComparator getValue() { 206 return value; 207 } 208 } 209 210 public static Option keyClass(Class<? extends WritableComparable> value) { 211 return new KeyClassOption(value); 212 } 213 214 public static Option comparator(WritableComparator value) { 215 return new ComparatorOption(value); 216 } 217 218 public static SequenceFile.Writer.Option valueClass(Class<?> value) { 219 return SequenceFile.Writer.valueClass(value); 220 } 221 222 public static 223 SequenceFile.Writer.Option compression(CompressionType type) { 224 return SequenceFile.Writer.compression(type); 225 } 226 227 public static 228 SequenceFile.Writer.Option compression(CompressionType type, 229 CompressionCodec codec) { 230 return SequenceFile.Writer.compression(type, codec); 231 } 232 233 public static SequenceFile.Writer.Option progressable(Progressable value) { 234 return SequenceFile.Writer.progressable(value); 235 } 236 237 @SuppressWarnings("unchecked") 238 public Writer(Configuration conf, 239 Path dirName, 240 SequenceFile.Writer.Option... opts 241 ) throws IOException { 242 KeyClassOption keyClassOption = 243 Options.getOption(KeyClassOption.class, opts); 244 ComparatorOption comparatorOption = 245 Options.getOption(ComparatorOption.class, opts); 246 if ((keyClassOption == null) == (comparatorOption == null)) { 247 throw new IllegalArgumentException("key class or comparator option " 248 + "must be set"); 249 } 250 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); 251 252 Class<? extends WritableComparable> keyClass; 253 if (keyClassOption == null) { 254 this.comparator = comparatorOption.getValue(); 255 keyClass = comparator.getKeyClass(); 256 } else { 257 keyClass= 258 (Class<? extends WritableComparable>) keyClassOption.getValue(); 259 this.comparator = WritableComparator.get(keyClass); 260 } 261 this.lastKey = comparator.newKey(); 262 FileSystem fs = dirName.getFileSystem(conf); 263 264 if (!fs.mkdirs(dirName)) { 265 throw new IOException("Mkdirs failed to create directory " + dirName); 266 } 267 Path dataFile = new Path(dirName, DATA_FILE_NAME); 268 Path indexFile = new Path(dirName, INDEX_FILE_NAME); 269 270 SequenceFile.Writer.Option[] dataOptions = 271 Options.prependOptions(opts, 272 SequenceFile.Writer.file(dataFile), 273 SequenceFile.Writer.keyClass(keyClass)); 274 this.data = SequenceFile.createWriter(conf, dataOptions); 275 276 SequenceFile.Writer.Option[] indexOptions = 277 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile), 278 SequenceFile.Writer.keyClass(keyClass), 279 SequenceFile.Writer.valueClass(LongWritable.class), 280 SequenceFile.Writer.compression(CompressionType.BLOCK)); 281 this.index = SequenceFile.createWriter(conf, indexOptions); 282 } 283 284 /** The number of entries that are added before an index entry is added.*/ 285 public int getIndexInterval() { return indexInterval; } 286 287 /** Sets the index interval. 288 * @see #getIndexInterval() 289 */ 290 public void setIndexInterval(int interval) { indexInterval = interval; } 291 292 /** Sets the index interval and stores it in conf 293 * @see #getIndexInterval() 294 */ 295 public static void setIndexInterval(Configuration conf, int interval) { 296 conf.setInt(INDEX_INTERVAL, interval); 297 } 298 299 /** Close the map. */ 300 @Override 301 public synchronized void close() throws IOException { 302 data.close(); 303 index.close(); 304 } 305 306 /** Append a key/value pair to the map. The key must be greater or equal 307 * to the previous key added to the map. */ 308 public synchronized void append(WritableComparable key, Writable val) 309 throws IOException { 310 311 checkKey(key); 312 313 long pos = data.getLength(); 314 // Only write an index if we've changed positions. In a block compressed 315 // file, this means we write an entry at the start of each block 316 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { 317 position.set(pos); // point to current eof 318 index.append(key, position); 319 lastIndexPos = pos; 320 lastIndexKeyCount = size; 321 } 322 323 data.append(key, val); // append key/value to data 324 size++; 325 } 326 327 private void checkKey(WritableComparable key) throws IOException { 328 // check that keys are well-ordered 329 if (size != 0 && comparator.compare(lastKey, key) > 0) 330 throw new IOException("key out of order: "+key+" after "+lastKey); 331 332 // update lastKey with a copy of key by writing and reading 333 outBuf.reset(); 334 key.write(outBuf); // write new key 335 336 inBuf.reset(outBuf.getData(), outBuf.getLength()); 337 lastKey.readFields(inBuf); // read into lastKey 338 } 339 340 } 341 342 /** Provide access to an existing map. */ 343 public static class Reader implements java.io.Closeable { 344 345 /** Number of index entries to skip between each entry. Zero by default. 346 * Setting this to values larger than zero can facilitate opening large map 347 * files using less memory. */ 348 private int INDEX_SKIP = 0; 349 350 private WritableComparator comparator; 351 352 private WritableComparable nextKey; 353 private long seekPosition = -1; 354 private int seekIndex = -1; 355 private long firstPosition; 356 357 // the data, on disk 358 private SequenceFile.Reader data; 359 private SequenceFile.Reader index; 360 361 // whether the index Reader was closed 362 private boolean indexClosed = false; 363 364 // the index, in memory 365 private int count = -1; 366 private WritableComparable[] keys; 367 private long[] positions; 368 369 /** Returns the class of keys in this file. */ 370 public Class<?> getKeyClass() { return data.getKeyClass(); } 371 372 /** Returns the class of values in this file. */ 373 public Class<?> getValueClass() { return data.getValueClass(); } 374 375 public static interface Option extends SequenceFile.Reader.Option {} 376 377 public static Option comparator(WritableComparator value) { 378 return new ComparatorOption(value); 379 } 380 381 static class ComparatorOption implements Option { 382 private final WritableComparator value; 383 ComparatorOption(WritableComparator value) { 384 this.value = value; 385 } 386 WritableComparator getValue() { 387 return value; 388 } 389 } 390 391 public Reader(Path dir, Configuration conf, 392 SequenceFile.Reader.Option... opts) throws IOException { 393 ComparatorOption comparatorOption = 394 Options.getOption(ComparatorOption.class, opts); 395 WritableComparator comparator = 396 comparatorOption == null ? null : comparatorOption.getValue(); 397 INDEX_SKIP = conf.getInt("io.map.index.skip", 0); 398 open(dir, comparator, conf, opts); 399 } 400 401 /** Construct a map reader for the named map. 402 * @deprecated 403 */ 404 @Deprecated 405 public Reader(FileSystem fs, String dirName, 406 Configuration conf) throws IOException { 407 this(new Path(dirName), conf); 408 } 409 410 /** Construct a map reader for the named map using the named comparator. 411 * @deprecated 412 */ 413 @Deprecated 414 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 415 Configuration conf) throws IOException { 416 this(new Path(dirName), conf, comparator(comparator)); 417 } 418 419 protected synchronized void open(Path dir, 420 WritableComparator comparator, 421 Configuration conf, 422 SequenceFile.Reader.Option... options 423 ) throws IOException { 424 Path dataFile = new Path(dir, DATA_FILE_NAME); 425 Path indexFile = new Path(dir, INDEX_FILE_NAME); 426 427 // open the data 428 this.data = createDataFileReader(dataFile, conf, options); 429 this.firstPosition = data.getPosition(); 430 431 if (comparator == null) 432 this.comparator = 433 WritableComparator.get(data.getKeyClass(). 434 asSubclass(WritableComparable.class)); 435 else 436 this.comparator = comparator; 437 438 // open the index 439 SequenceFile.Reader.Option[] indexOptions = 440 Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); 441 this.index = new SequenceFile.Reader(conf, indexOptions); 442 } 443 444 /** 445 * Override this method to specialize the type of 446 * {@link SequenceFile.Reader} returned. 447 */ 448 protected SequenceFile.Reader 449 createDataFileReader(Path dataFile, Configuration conf, 450 SequenceFile.Reader.Option... options 451 ) throws IOException { 452 SequenceFile.Reader.Option[] newOptions = 453 Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); 454 return new SequenceFile.Reader(conf, newOptions); 455 } 456 457 private void readIndex() throws IOException { 458 // read the index entirely into memory 459 if (this.keys != null) 460 return; 461 this.count = 0; 462 this.positions = new long[1024]; 463 464 try { 465 int skip = INDEX_SKIP; 466 LongWritable position = new LongWritable(); 467 WritableComparable lastKey = null; 468 long lastIndex = -1; 469 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024); 470 while (true) { 471 WritableComparable k = comparator.newKey(); 472 473 if (!index.next(k, position)) 474 break; 475 476 // check order to make sure comparator is compatible 477 if (lastKey != null && comparator.compare(lastKey, k) > 0) 478 throw new IOException("key out of order: "+k+" after "+lastKey); 479 lastKey = k; 480 if (skip > 0) { 481 skip--; 482 continue; // skip this entry 483 } else { 484 skip = INDEX_SKIP; // reset skip 485 } 486 487 // don't read an index that is the same as the previous one. Block 488 // compressed map files used to do this (multiple entries would point 489 // at the same block) 490 if (position.get() == lastIndex) 491 continue; 492 493 if (count == positions.length) { 494 positions = Arrays.copyOf(positions, positions.length * 2); 495 } 496 497 keyBuilder.add(k); 498 positions[count] = position.get(); 499 count++; 500 } 501 502 this.keys = keyBuilder.toArray(new WritableComparable[count]); 503 positions = Arrays.copyOf(positions, count); 504 } catch (EOFException e) { 505 LOG.warn("Unexpected EOF reading " + index + 506 " at entry #" + count + ". Ignoring."); 507 } finally { 508 indexClosed = true; 509 index.close(); 510 } 511 } 512 513 /** Re-positions the reader before its first key. */ 514 public synchronized void reset() throws IOException { 515 data.seek(firstPosition); 516 } 517 518 /** Get the key at approximately the middle of the file. Or null if the 519 * file is empty. 520 */ 521 public synchronized WritableComparable midKey() throws IOException { 522 523 readIndex(); 524 if (count == 0) { 525 return null; 526 } 527 528 return keys[(count - 1) / 2]; 529 } 530 531 /** Reads the final key from the file. 532 * 533 * @param key key to read into 534 */ 535 public synchronized void finalKey(WritableComparable key) 536 throws IOException { 537 538 long originalPosition = data.getPosition(); // save position 539 try { 540 readIndex(); // make sure index is valid 541 if (count > 0) { 542 data.seek(positions[count-1]); // skip to last indexed entry 543 } else { 544 reset(); // start at the beginning 545 } 546 while (data.next(key)) {} // scan to eof 547 548 } finally { 549 data.seek(originalPosition); // restore position 550 } 551 } 552 553 /** Positions the reader at the named key, or if none such exists, at the 554 * first entry after the named key. Returns true iff the named key exists 555 * in this map. 556 */ 557 public synchronized boolean seek(WritableComparable key) throws IOException { 558 return seekInternal(key) == 0; 559 } 560 561 /** 562 * Positions the reader at the named key, or if none such exists, at the 563 * first entry after the named key. 564 * 565 * @return 0 - exact match found 566 * < 0 - positioned at next record 567 * 1 - no more records in file 568 */ 569 private synchronized int seekInternal(WritableComparable key) 570 throws IOException { 571 return seekInternal(key, false); 572 } 573 574 /** 575 * Positions the reader at the named key, or if none such exists, at the 576 * key that falls just before or just after dependent on how the 577 * <code>before</code> parameter is set. 578 * 579 * @param before - IF true, and <code>key</code> does not exist, position 580 * file at entry that falls just before <code>key</code>. Otherwise, 581 * position file at record that sorts just after. 582 * @return 0 - exact match found 583 * < 0 - positioned at next record 584 * 1 - no more records in file 585 */ 586 private synchronized int seekInternal(WritableComparable key, 587 final boolean before) 588 throws IOException { 589 readIndex(); // make sure index is read 590 591 if (seekIndex != -1 // seeked before 592 && seekIndex+1 < count 593 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed 594 && comparator.compare(key, nextKey) 595 >= 0) { // but after last seeked 596 // do nothing 597 } else { 598 seekIndex = binarySearch(key); 599 if (seekIndex < 0) // decode insertion point 600 seekIndex = -seekIndex-2; 601 602 if (seekIndex == -1) // belongs before first entry 603 seekPosition = firstPosition; // use beginning of file 604 else 605 seekPosition = positions[seekIndex]; // else use index 606 } 607 data.seek(seekPosition); 608 609 if (nextKey == null) 610 nextKey = comparator.newKey(); 611 612 // If we're looking for the key before, we need to keep track 613 // of the position we got the current key as well as the position 614 // of the key before it. 615 long prevPosition = -1; 616 long curPosition = seekPosition; 617 618 while (data.next(nextKey)) { 619 int c = comparator.compare(key, nextKey); 620 if (c <= 0) { // at or beyond desired 621 if (before && c != 0) { 622 if (prevPosition == -1) { 623 // We're on the first record of this index block 624 // and we've already passed the search key. Therefore 625 // we must be at the beginning of the file, so seek 626 // to the beginning of this block and return c 627 data.seek(curPosition); 628 } else { 629 // We have a previous record to back up to 630 data.seek(prevPosition); 631 data.next(nextKey); 632 // now that we've rewound, the search key must be greater than this key 633 return 1; 634 } 635 } 636 return c; 637 } 638 if (before) { 639 prevPosition = curPosition; 640 curPosition = data.getPosition(); 641 } 642 } 643 644 return 1; 645 } 646 647 private int binarySearch(WritableComparable key) { 648 int low = 0; 649 int high = count-1; 650 651 while (low <= high) { 652 int mid = (low + high) >>> 1; 653 WritableComparable midVal = keys[mid]; 654 int cmp = comparator.compare(midVal, key); 655 656 if (cmp < 0) 657 low = mid + 1; 658 else if (cmp > 0) 659 high = mid - 1; 660 else 661 return mid; // key found 662 } 663 return -(low + 1); // key not found. 664 } 665 666 /** Read the next key/value pair in the map into <code>key</code> and 667 * <code>val</code>. Returns true if such a pair exists and false when at 668 * the end of the map */ 669 public synchronized boolean next(WritableComparable key, Writable val) 670 throws IOException { 671 return data.next(key, val); 672 } 673 674 /** Return the value for the named key, or null if none exists. */ 675 public synchronized Writable get(WritableComparable key, Writable val) 676 throws IOException { 677 if (seek(key)) { 678 data.getCurrentValue(val); 679 return val; 680 } else 681 return null; 682 } 683 684 /** 685 * Finds the record that is the closest match to the specified key. 686 * Returns <code>key</code> or if it does not exist, at the first entry 687 * after the named key. 688 * 689 - * @param key - key that we're trying to find 690 - * @param val - data value if key is found 691 - * @return - the key that was the closest match or null if eof. 692 */ 693 public synchronized WritableComparable getClosest(WritableComparable key, 694 Writable val) 695 throws IOException { 696 return getClosest(key, val, false); 697 } 698 699 /** 700 * Finds the record that is the closest match to the specified key. 701 * 702 * @param key - key that we're trying to find 703 * @param val - data value if key is found 704 * @param before - IF true, and <code>key</code> does not exist, return 705 * the first entry that falls just before the <code>key</code>. Otherwise, 706 * return the record that sorts just after. 707 * @return - the key that was the closest match or null if eof. 708 */ 709 public synchronized WritableComparable getClosest(WritableComparable key, 710 Writable val, final boolean before) 711 throws IOException { 712 713 int c = seekInternal(key, before); 714 715 // If we didn't get an exact match, and we ended up in the wrong 716 // direction relative to the query key, return null since we 717 // must be at the beginning or end of the file. 718 if ((!before && c > 0) || 719 (before && c < 0)) { 720 return null; 721 } 722 723 data.getCurrentValue(val); 724 return nextKey; 725 } 726 727 /** Close the map. */ 728 @Override 729 public synchronized void close() throws IOException { 730 if (!indexClosed) { 731 index.close(); 732 } 733 data.close(); 734 } 735 736 } 737 738 /** Renames an existing map directory. */ 739 public static void rename(FileSystem fs, String oldName, String newName) 740 throws IOException { 741 Path oldDir = new Path(oldName); 742 Path newDir = new Path(newName); 743 if (!fs.rename(oldDir, newDir)) { 744 throw new IOException("Could not rename " + oldDir + " to " + newDir); 745 } 746 } 747 748 /** Deletes the named map file. */ 749 public static void delete(FileSystem fs, String name) throws IOException { 750 Path dir = new Path(name); 751 Path data = new Path(dir, DATA_FILE_NAME); 752 Path index = new Path(dir, INDEX_FILE_NAME); 753 754 fs.delete(data, true); 755 fs.delete(index, true); 756 fs.delete(dir, true); 757 } 758 759 /** 760 * This method attempts to fix a corrupt MapFile by re-creating its index. 761 * @param fs filesystem 762 * @param dir directory containing the MapFile data and index 763 * @param keyClass key class (has to be a subclass of Writable) 764 * @param valueClass value class (has to be a subclass of Writable) 765 * @param dryrun do not perform any changes, just report what needs to be done 766 * @return number of valid entries in this MapFile, or -1 if no fixing was needed 767 * @throws Exception 768 */ 769 public static long fix(FileSystem fs, Path dir, 770 Class<? extends Writable> keyClass, 771 Class<? extends Writable> valueClass, boolean dryrun, 772 Configuration conf) throws Exception { 773 String dr = (dryrun ? "[DRY RUN ] " : ""); 774 Path data = new Path(dir, DATA_FILE_NAME); 775 Path index = new Path(dir, INDEX_FILE_NAME); 776 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128); 777 if (!fs.exists(data)) { 778 // there's nothing we can do to fix this! 779 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this."); 780 } 781 if (fs.exists(index)) { 782 // no fixing needed 783 return -1; 784 } 785 SequenceFile.Reader dataReader = 786 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data)); 787 if (!dataReader.getKeyClass().equals(keyClass)) { 788 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + 789 ", got " + dataReader.getKeyClass().getName()); 790 } 791 if (!dataReader.getValueClass().equals(valueClass)) { 792 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + 793 ", got " + dataReader.getValueClass().getName()); 794 } 795 long cnt = 0L; 796 Writable key = ReflectionUtils.newInstance(keyClass, conf); 797 Writable value = ReflectionUtils.newInstance(valueClass, conf); 798 SequenceFile.Writer indexWriter = null; 799 if (!dryrun) { 800 indexWriter = 801 SequenceFile.createWriter(conf, 802 SequenceFile.Writer.file(index), 803 SequenceFile.Writer.keyClass(keyClass), 804 SequenceFile.Writer.valueClass 805 (LongWritable.class)); 806 } 807 try { 808 long pos = 0L; 809 LongWritable position = new LongWritable(); 810 while(dataReader.next(key, value)) { 811 cnt++; 812 if (cnt % indexInterval == 0) { 813 position.set(pos); 814 if (!dryrun) indexWriter.append(key, position); 815 } 816 pos = dataReader.getPosition(); 817 } 818 } catch(Throwable t) { 819 // truncated data file. swallow it. 820 } 821 dataReader.close(); 822 if (!dryrun) indexWriter.close(); 823 return cnt; 824 } 825 826 827 public static void main(String[] args) throws Exception { 828 String usage = "Usage: MapFile inFile outFile"; 829 830 if (args.length != 2) { 831 System.err.println(usage); 832 System.exit(-1); 833 } 834 835 String in = args[0]; 836 String out = args[1]; 837 838 Configuration conf = new Configuration(); 839 FileSystem fs = FileSystem.getLocal(conf); 840 MapFile.Reader reader = null; 841 MapFile.Writer writer = null; 842 try { 843 reader = new MapFile.Reader(fs, in, conf); 844 writer = 845 new MapFile.Writer(conf, fs, out, 846 reader.getKeyClass().asSubclass(WritableComparable.class), 847 reader.getValueClass()); 848 849 WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass() 850 .asSubclass(WritableComparable.class), conf); 851 Writable value = ReflectionUtils.newInstance(reader.getValueClass() 852 .asSubclass(Writable.class), conf); 853 854 while (reader.next(key, value)) // copy all entries 855 writer.append(key, value); 856 } finally { 857 IOUtils.cleanup(LOG, writer, reader); 858 } 859 } 860 }