001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.HadoopIllegalArgumentException; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.IOUtils; 035import org.apache.hadoop.io.SequenceFile.CompressionType; 036import org.apache.hadoop.io.compress.CompressionCodec; 037import org.apache.hadoop.util.Options; 038import org.apache.hadoop.util.Progressable; 039import org.apache.hadoop.util.ReflectionUtils; 040 041/** A file-based map from keys to values. 042 * 043 * <p>A map is a directory containing two files, the <code>data</code> file, 044 * containing all keys and values in the map, and a smaller <code>index</code> 045 * file, containing a fraction of the keys. The fraction is determined by 046 * {@link Writer#getIndexInterval()}. 047 * 048 * <p>The index file is read entirely into memory. Thus key implementations 049 * should try to keep themselves small. 050 * 051 * <p>Map files are created by adding entries in-order. To maintain a large 052 * database, perform updates by copying the previous version of a database and 053 * merging in a sorted change list, to create a new version of the database in 054 * a new file. Sorting large change lists can be done with {@link 055 * SequenceFile.Sorter}. 056 */ 057@InterfaceAudience.Public 058@InterfaceStability.Stable 059public class MapFile { 060 private static final Log LOG = LogFactory.getLog(MapFile.class); 061 062 /** The name of the index file. */ 063 public static final String INDEX_FILE_NAME = "index"; 064 065 /** The name of the data file. */ 066 public static final String DATA_FILE_NAME = "data"; 067 068 protected MapFile() {} // no public ctor 069 070 /** Writes a new map. */ 071 public static class Writer implements java.io.Closeable { 072 private SequenceFile.Writer data; 073 private SequenceFile.Writer index; 074 075 final private static String INDEX_INTERVAL = "io.map.index.interval"; 076 private int indexInterval = 128; 077 078 private long size; 079 private LongWritable position = new LongWritable(); 080 081 // the following fields are used only for checking key order 082 private WritableComparator comparator; 083 private DataInputBuffer inBuf = new DataInputBuffer(); 084 private DataOutputBuffer outBuf = new DataOutputBuffer(); 085 private WritableComparable lastKey; 086 087 /** What's the position (in bytes) we wrote when we got the last index */ 088 private long lastIndexPos = -1; 089 090 /** 091 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that 092 * we have an index at position zero -- midKey will throw an exception if this 093 * is not the case 094 */ 095 private long lastIndexKeyCount = Long.MIN_VALUE; 096 097 098 /** Create the named map for keys of the named class. 099 * @deprecated Use Writer(Configuration, Path, Option...) instead. 100 */ 101 @Deprecated 102 public Writer(Configuration conf, FileSystem fs, String dirName, 103 Class<? extends WritableComparable> keyClass, 104 Class valClass) throws IOException { 105 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 106 } 107 108 /** Create the named map for keys of the named class. 109 * @deprecated Use Writer(Configuration, Path, Option...) instead. 110 */ 111 @Deprecated 112 public Writer(Configuration conf, FileSystem fs, String dirName, 113 Class<? extends WritableComparable> keyClass, Class valClass, 114 CompressionType compress, 115 Progressable progress) throws IOException { 116 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 117 compression(compress), progressable(progress)); 118 } 119 120 /** Create the named map for keys of the named class. 121 * @deprecated Use Writer(Configuration, Path, Option...) instead. 122 */ 123 @Deprecated 124 public Writer(Configuration conf, FileSystem fs, String dirName, 125 Class<? extends WritableComparable> keyClass, Class valClass, 126 CompressionType compress, CompressionCodec codec, 127 Progressable progress) throws IOException { 128 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 129 compression(compress, codec), progressable(progress)); 130 } 131 132 /** Create the named map for keys of the named class. 133 * @deprecated Use Writer(Configuration, Path, Option...) instead. 134 */ 135 @Deprecated 136 public Writer(Configuration conf, FileSystem fs, String dirName, 137 Class<? extends WritableComparable> keyClass, Class valClass, 138 CompressionType compress) throws IOException { 139 this(conf, new Path(dirName), keyClass(keyClass), 140 valueClass(valClass), compression(compress)); 141 } 142 143 /** Create the named map using the named key comparator. 144 * @deprecated Use Writer(Configuration, Path, Option...) instead. 145 */ 146 @Deprecated 147 public Writer(Configuration conf, FileSystem fs, String dirName, 148 WritableComparator comparator, Class valClass 149 ) throws IOException { 150 this(conf, new Path(dirName), comparator(comparator), 151 valueClass(valClass)); 152 } 153 154 /** Create the named map using the named key comparator. 155 * @deprecated Use Writer(Configuration, Path, Option...) instead. 156 */ 157 @Deprecated 158 public Writer(Configuration conf, FileSystem fs, String dirName, 159 WritableComparator comparator, Class valClass, 160 SequenceFile.CompressionType compress) throws IOException { 161 this(conf, new Path(dirName), comparator(comparator), 162 valueClass(valClass), compression(compress)); 163 } 164 165 /** Create the named map using the named key comparator. 166 * @deprecated Use Writer(Configuration, Path, Option...)} instead. 167 */ 168 @Deprecated 169 public Writer(Configuration conf, FileSystem fs, String dirName, 170 WritableComparator comparator, Class valClass, 171 SequenceFile.CompressionType compress, 172 Progressable progress) throws IOException { 173 this(conf, new Path(dirName), comparator(comparator), 174 valueClass(valClass), compression(compress), 175 progressable(progress)); 176 } 177 178 /** Create the named map using the named key comparator. 179 * @deprecated Use Writer(Configuration, Path, Option...) instead. 180 */ 181 @Deprecated 182 public Writer(Configuration conf, FileSystem fs, String dirName, 183 WritableComparator comparator, Class valClass, 184 SequenceFile.CompressionType compress, CompressionCodec codec, 185 Progressable progress) throws IOException { 186 this(conf, new Path(dirName), comparator(comparator), 187 valueClass(valClass), compression(compress, codec), 188 progressable(progress)); 189 } 190 191 // our options are a superset of sequence file writer options 192 public static interface Option extends SequenceFile.Writer.Option { } 193 194 private static class KeyClassOption extends Options.ClassOption 195 implements Option { 196 KeyClassOption(Class<?> value) { 197 super(value); 198 } 199 } 200 201 private static class ComparatorOption implements Option { 202 private final WritableComparator value; 203 ComparatorOption(WritableComparator value) { 204 this.value = value; 205 } 206 WritableComparator getValue() { 207 return value; 208 } 209 } 210 211 public static Option keyClass(Class<? extends WritableComparable> value) { 212 return new KeyClassOption(value); 213 } 214 215 public static Option comparator(WritableComparator value) { 216 return new ComparatorOption(value); 217 } 218 219 public static SequenceFile.Writer.Option valueClass(Class<?> value) { 220 return SequenceFile.Writer.valueClass(value); 221 } 222 223 public static 224 SequenceFile.Writer.Option compression(CompressionType type) { 225 return SequenceFile.Writer.compression(type); 226 } 227 228 public static 229 SequenceFile.Writer.Option compression(CompressionType type, 230 CompressionCodec codec) { 231 return SequenceFile.Writer.compression(type, codec); 232 } 233 234 public static SequenceFile.Writer.Option progressable(Progressable value) { 235 return SequenceFile.Writer.progressable(value); 236 } 237 238 @SuppressWarnings("unchecked") 239 public Writer(Configuration conf, 240 Path dirName, 241 SequenceFile.Writer.Option... opts 242 ) throws IOException { 243 KeyClassOption keyClassOption = 244 Options.getOption(KeyClassOption.class, opts); 245 ComparatorOption comparatorOption = 246 Options.getOption(ComparatorOption.class, opts); 247 if ((keyClassOption == null) == (comparatorOption == null)) { 248 throw new IllegalArgumentException("key class or comparator option " 249 + "must be set"); 250 } 251 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); 252 253 Class<? extends WritableComparable> keyClass; 254 if (keyClassOption == null) { 255 this.comparator = comparatorOption.getValue(); 256 keyClass = comparator.getKeyClass(); 257 } else { 258 keyClass= 259 (Class<? extends WritableComparable>) keyClassOption.getValue(); 260 this.comparator = WritableComparator.get(keyClass, conf); 261 } 262 this.lastKey = comparator.newKey(); 263 FileSystem fs = dirName.getFileSystem(conf); 264 265 if (!fs.mkdirs(dirName)) { 266 throw new IOException("Mkdirs failed to create directory " + dirName); 267 } 268 Path dataFile = new Path(dirName, DATA_FILE_NAME); 269 Path indexFile = new Path(dirName, INDEX_FILE_NAME); 270 271 SequenceFile.Writer.Option[] dataOptions = 272 Options.prependOptions(opts, 273 SequenceFile.Writer.file(dataFile), 274 SequenceFile.Writer.keyClass(keyClass)); 275 this.data = SequenceFile.createWriter(conf, dataOptions); 276 277 SequenceFile.Writer.Option[] indexOptions = 278 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile), 279 SequenceFile.Writer.keyClass(keyClass), 280 SequenceFile.Writer.valueClass(LongWritable.class), 281 SequenceFile.Writer.compression(CompressionType.BLOCK)); 282 this.index = SequenceFile.createWriter(conf, indexOptions); 283 } 284 285 /** The number of entries that are added before an index entry is added.*/ 286 public int getIndexInterval() { return indexInterval; } 287 288 /** Sets the index interval. 289 * @see #getIndexInterval() 290 */ 291 public void setIndexInterval(int interval) { indexInterval = interval; } 292 293 /** Sets the index interval and stores it in conf 294 * @see #getIndexInterval() 295 */ 296 public static void setIndexInterval(Configuration conf, int interval) { 297 conf.setInt(INDEX_INTERVAL, interval); 298 } 299 300 /** Close the map. */ 301 @Override 302 public synchronized void close() throws IOException { 303 data.close(); 304 index.close(); 305 } 306 307 /** Append a key/value pair to the map. The key must be greater or equal 308 * to the previous key added to the map. */ 309 public synchronized void append(WritableComparable key, Writable val) 310 throws IOException { 311 312 checkKey(key); 313 314 long pos = data.getLength(); 315 // Only write an index if we've changed positions. In a block compressed 316 // file, this means we write an entry at the start of each block 317 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { 318 position.set(pos); // point to current eof 319 index.append(key, position); 320 lastIndexPos = pos; 321 lastIndexKeyCount = size; 322 } 323 324 data.append(key, val); // append key/value to data 325 size++; 326 } 327 328 private void checkKey(WritableComparable key) throws IOException { 329 // check that keys are well-ordered 330 if (size != 0 && comparator.compare(lastKey, key) > 0) 331 throw new IOException("key out of order: "+key+" after "+lastKey); 332 333 // update lastKey with a copy of key by writing and reading 334 outBuf.reset(); 335 key.write(outBuf); // write new key 336 337 inBuf.reset(outBuf.getData(), outBuf.getLength()); 338 lastKey.readFields(inBuf); // read into lastKey 339 } 340 341 } 342 343 /** Provide access to an existing map. */ 344 public static class Reader implements java.io.Closeable { 345 346 /** Number of index entries to skip between each entry. Zero by default. 347 * Setting this to values larger than zero can facilitate opening large map 348 * files using less memory. */ 349 private int INDEX_SKIP = 0; 350 351 private WritableComparator comparator; 352 353 private WritableComparable nextKey; 354 private long seekPosition = -1; 355 private int seekIndex = -1; 356 private long firstPosition; 357 358 // the data, on disk 359 private SequenceFile.Reader data; 360 private SequenceFile.Reader index; 361 362 // whether the index Reader was closed 363 private boolean indexClosed = false; 364 365 // the index, in memory 366 private int count = -1; 367 private WritableComparable[] keys; 368 private long[] positions; 369 370 /** Returns the class of keys in this file. */ 371 public Class<?> getKeyClass() { return data.getKeyClass(); } 372 373 /** Returns the class of values in this file. */ 374 public Class<?> getValueClass() { return data.getValueClass(); } 375 376 public static interface Option extends SequenceFile.Reader.Option {} 377 378 public static Option comparator(WritableComparator value) { 379 return new ComparatorOption(value); 380 } 381 382 static class ComparatorOption implements Option { 383 private final WritableComparator value; 384 ComparatorOption(WritableComparator value) { 385 this.value = value; 386 } 387 WritableComparator getValue() { 388 return value; 389 } 390 } 391 392 public Reader(Path dir, Configuration conf, 393 SequenceFile.Reader.Option... opts) throws IOException { 394 ComparatorOption comparatorOption = 395 Options.getOption(ComparatorOption.class, opts); 396 WritableComparator comparator = 397 comparatorOption == null ? null : comparatorOption.getValue(); 398 INDEX_SKIP = conf.getInt("io.map.index.skip", 0); 399 open(dir, comparator, conf, opts); 400 } 401 402 /** Construct a map reader for the named map. 403 * @deprecated 404 */ 405 @Deprecated 406 public Reader(FileSystem fs, String dirName, 407 Configuration conf) throws IOException { 408 this(new Path(dirName), conf); 409 } 410 411 /** Construct a map reader for the named map using the named comparator. 412 * @deprecated 413 */ 414 @Deprecated 415 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 416 Configuration conf) throws IOException { 417 this(new Path(dirName), conf, comparator(comparator)); 418 } 419 420 protected synchronized void open(Path dir, 421 WritableComparator comparator, 422 Configuration conf, 423 SequenceFile.Reader.Option... options 424 ) throws IOException { 425 Path dataFile = new Path(dir, DATA_FILE_NAME); 426 Path indexFile = new Path(dir, INDEX_FILE_NAME); 427 428 // open the data 429 this.data = createDataFileReader(dataFile, conf, options); 430 this.firstPosition = data.getPosition(); 431 432 if (comparator == null) { 433 Class<? extends WritableComparable> cls; 434 cls = data.getKeyClass().asSubclass(WritableComparable.class); 435 this.comparator = WritableComparator.get(cls, conf); 436 } else { 437 this.comparator = comparator; 438 } 439 440 // open the index 441 SequenceFile.Reader.Option[] indexOptions = 442 Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); 443 this.index = new SequenceFile.Reader(conf, indexOptions); 444 } 445 446 /** 447 * Override this method to specialize the type of 448 * {@link SequenceFile.Reader} returned. 449 */ 450 protected SequenceFile.Reader 451 createDataFileReader(Path dataFile, Configuration conf, 452 SequenceFile.Reader.Option... options 453 ) throws IOException { 454 SequenceFile.Reader.Option[] newOptions = 455 Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); 456 return new SequenceFile.Reader(conf, newOptions); 457 } 458 459 private void readIndex() throws IOException { 460 // read the index entirely into memory 461 if (this.keys != null) 462 return; 463 this.count = 0; 464 this.positions = new long[1024]; 465 466 try { 467 int skip = INDEX_SKIP; 468 LongWritable position = new LongWritable(); 469 WritableComparable lastKey = null; 470 long lastIndex = -1; 471 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024); 472 while (true) { 473 WritableComparable k = comparator.newKey(); 474 475 if (!index.next(k, position)) 476 break; 477 478 // check order to make sure comparator is compatible 479 if (lastKey != null && comparator.compare(lastKey, k) > 0) 480 throw new IOException("key out of order: "+k+" after "+lastKey); 481 lastKey = k; 482 if (skip > 0) { 483 skip--; 484 continue; // skip this entry 485 } else { 486 skip = INDEX_SKIP; // reset skip 487 } 488 489 // don't read an index that is the same as the previous one. Block 490 // compressed map files used to do this (multiple entries would point 491 // at the same block) 492 if (position.get() == lastIndex) 493 continue; 494 495 if (count == positions.length) { 496 positions = Arrays.copyOf(positions, positions.length * 2); 497 } 498 499 keyBuilder.add(k); 500 positions[count] = position.get(); 501 count++; 502 } 503 504 this.keys = keyBuilder.toArray(new WritableComparable[count]); 505 positions = Arrays.copyOf(positions, count); 506 } catch (EOFException e) { 507 LOG.warn("Unexpected EOF reading " + index + 508 " at entry #" + count + ". Ignoring."); 509 } finally { 510 indexClosed = true; 511 index.close(); 512 } 513 } 514 515 /** Re-positions the reader before its first key. */ 516 public synchronized void reset() throws IOException { 517 data.seek(firstPosition); 518 } 519 520 /** Get the key at approximately the middle of the file. Or null if the 521 * file is empty. 522 */ 523 public synchronized WritableComparable midKey() throws IOException { 524 525 readIndex(); 526 if (count == 0) { 527 return null; 528 } 529 530 return keys[(count - 1) / 2]; 531 } 532 533 /** Reads the final key from the file. 534 * 535 * @param key key to read into 536 */ 537 public synchronized void finalKey(WritableComparable key) 538 throws IOException { 539 540 long originalPosition = data.getPosition(); // save position 541 try { 542 readIndex(); // make sure index is valid 543 if (count > 0) { 544 data.seek(positions[count-1]); // skip to last indexed entry 545 } else { 546 reset(); // start at the beginning 547 } 548 while (data.next(key)) {} // scan to eof 549 550 } finally { 551 data.seek(originalPosition); // restore position 552 } 553 } 554 555 /** Positions the reader at the named key, or if none such exists, at the 556 * first entry after the named key. Returns true iff the named key exists 557 * in this map. 558 */ 559 public synchronized boolean seek(WritableComparable key) throws IOException { 560 return seekInternal(key) == 0; 561 } 562 563 /** 564 * Positions the reader at the named key, or if none such exists, at the 565 * first entry after the named key. 566 * 567 * @return 0 - exact match found 568 * < 0 - positioned at next record 569 * 1 - no more records in file 570 */ 571 private synchronized int seekInternal(WritableComparable key) 572 throws IOException { 573 return seekInternal(key, false); 574 } 575 576 /** 577 * Positions the reader at the named key, or if none such exists, at the 578 * key that falls just before or just after dependent on how the 579 * <code>before</code> parameter is set. 580 * 581 * @param before - IF true, and <code>key</code> does not exist, position 582 * file at entry that falls just before <code>key</code>. Otherwise, 583 * position file at record that sorts just after. 584 * @return 0 - exact match found 585 * < 0 - positioned at next record 586 * 1 - no more records in file 587 */ 588 private synchronized int seekInternal(WritableComparable key, 589 final boolean before) 590 throws IOException { 591 readIndex(); // make sure index is read 592 593 if (seekIndex != -1 // seeked before 594 && seekIndex+1 < count 595 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed 596 && comparator.compare(key, nextKey) 597 >= 0) { // but after last seeked 598 // do nothing 599 } else { 600 seekIndex = binarySearch(key); 601 if (seekIndex < 0) // decode insertion point 602 seekIndex = -seekIndex-2; 603 604 if (seekIndex == -1) // belongs before first entry 605 seekPosition = firstPosition; // use beginning of file 606 else 607 seekPosition = positions[seekIndex]; // else use index 608 } 609 data.seek(seekPosition); 610 611 if (nextKey == null) 612 nextKey = comparator.newKey(); 613 614 // If we're looking for the key before, we need to keep track 615 // of the position we got the current key as well as the position 616 // of the key before it. 617 long prevPosition = -1; 618 long curPosition = seekPosition; 619 620 while (data.next(nextKey)) { 621 int c = comparator.compare(key, nextKey); 622 if (c <= 0) { // at or beyond desired 623 if (before && c != 0) { 624 if (prevPosition == -1) { 625 // We're on the first record of this index block 626 // and we've already passed the search key. Therefore 627 // we must be at the beginning of the file, so seek 628 // to the beginning of this block and return c 629 data.seek(curPosition); 630 } else { 631 // We have a previous record to back up to 632 data.seek(prevPosition); 633 data.next(nextKey); 634 // now that we've rewound, the search key must be greater than this key 635 return 1; 636 } 637 } 638 return c; 639 } 640 if (before) { 641 prevPosition = curPosition; 642 curPosition = data.getPosition(); 643 } 644 } 645 646 return 1; 647 } 648 649 private int binarySearch(WritableComparable key) { 650 int low = 0; 651 int high = count-1; 652 653 while (low <= high) { 654 int mid = (low + high) >>> 1; 655 WritableComparable midVal = keys[mid]; 656 int cmp = comparator.compare(midVal, key); 657 658 if (cmp < 0) 659 low = mid + 1; 660 else if (cmp > 0) 661 high = mid - 1; 662 else 663 return mid; // key found 664 } 665 return -(low + 1); // key not found. 666 } 667 668 /** Read the next key/value pair in the map into <code>key</code> and 669 * <code>val</code>. Returns true if such a pair exists and false when at 670 * the end of the map */ 671 public synchronized boolean next(WritableComparable key, Writable val) 672 throws IOException { 673 return data.next(key, val); 674 } 675 676 /** Return the value for the named key, or null if none exists. */ 677 public synchronized Writable get(WritableComparable key, Writable val) 678 throws IOException { 679 if (seek(key)) { 680 data.getCurrentValue(val); 681 return val; 682 } else 683 return null; 684 } 685 686 /** 687 * Finds the record that is the closest match to the specified key. 688 * Returns <code>key</code> or if it does not exist, at the first entry 689 * after the named key. 690 * 691- * @param key - key that we're trying to find 692- * @param val - data value if key is found 693- * @return - the key that was the closest match or null if eof. 694 */ 695 public synchronized WritableComparable getClosest(WritableComparable key, 696 Writable val) 697 throws IOException { 698 return getClosest(key, val, false); 699 } 700 701 /** 702 * Finds the record that is the closest match to the specified key. 703 * 704 * @param key - key that we're trying to find 705 * @param val - data value if key is found 706 * @param before - IF true, and <code>key</code> does not exist, return 707 * the first entry that falls just before the <code>key</code>. Otherwise, 708 * return the record that sorts just after. 709 * @return - the key that was the closest match or null if eof. 710 */ 711 public synchronized WritableComparable getClosest(WritableComparable key, 712 Writable val, final boolean before) 713 throws IOException { 714 715 int c = seekInternal(key, before); 716 717 // If we didn't get an exact match, and we ended up in the wrong 718 // direction relative to the query key, return null since we 719 // must be at the beginning or end of the file. 720 if ((!before && c > 0) || 721 (before && c < 0)) { 722 return null; 723 } 724 725 data.getCurrentValue(val); 726 return nextKey; 727 } 728 729 /** Close the map. */ 730 @Override 731 public synchronized void close() throws IOException { 732 if (!indexClosed) { 733 index.close(); 734 } 735 data.close(); 736 } 737 738 } 739 740 /** Renames an existing map directory. */ 741 public static void rename(FileSystem fs, String oldName, String newName) 742 throws IOException { 743 Path oldDir = new Path(oldName); 744 Path newDir = new Path(newName); 745 if (!fs.rename(oldDir, newDir)) { 746 throw new IOException("Could not rename " + oldDir + " to " + newDir); 747 } 748 } 749 750 /** Deletes the named map file. */ 751 public static void delete(FileSystem fs, String name) throws IOException { 752 Path dir = new Path(name); 753 Path data = new Path(dir, DATA_FILE_NAME); 754 Path index = new Path(dir, INDEX_FILE_NAME); 755 756 fs.delete(data, true); 757 fs.delete(index, true); 758 fs.delete(dir, true); 759 } 760 761 /** 762 * This method attempts to fix a corrupt MapFile by re-creating its index. 763 * @param fs filesystem 764 * @param dir directory containing the MapFile data and index 765 * @param keyClass key class (has to be a subclass of Writable) 766 * @param valueClass value class (has to be a subclass of Writable) 767 * @param dryrun do not perform any changes, just report what needs to be done 768 * @return number of valid entries in this MapFile, or -1 if no fixing was needed 769 * @throws Exception 770 */ 771 public static long fix(FileSystem fs, Path dir, 772 Class<? extends Writable> keyClass, 773 Class<? extends Writable> valueClass, boolean dryrun, 774 Configuration conf) throws Exception { 775 String dr = (dryrun ? "[DRY RUN ] " : ""); 776 Path data = new Path(dir, DATA_FILE_NAME); 777 Path index = new Path(dir, INDEX_FILE_NAME); 778 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128); 779 if (!fs.exists(data)) { 780 // there's nothing we can do to fix this! 781 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this."); 782 } 783 if (fs.exists(index)) { 784 // no fixing needed 785 return -1; 786 } 787 SequenceFile.Reader dataReader = 788 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data)); 789 if (!dataReader.getKeyClass().equals(keyClass)) { 790 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + 791 ", got " + dataReader.getKeyClass().getName()); 792 } 793 if (!dataReader.getValueClass().equals(valueClass)) { 794 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + 795 ", got " + dataReader.getValueClass().getName()); 796 } 797 long cnt = 0L; 798 Writable key = ReflectionUtils.newInstance(keyClass, conf); 799 Writable value = ReflectionUtils.newInstance(valueClass, conf); 800 SequenceFile.Writer indexWriter = null; 801 if (!dryrun) { 802 indexWriter = 803 SequenceFile.createWriter(conf, 804 SequenceFile.Writer.file(index), 805 SequenceFile.Writer.keyClass(keyClass), 806 SequenceFile.Writer.valueClass 807 (LongWritable.class)); 808 } 809 try { 810 long pos = 0L; 811 LongWritable position = new LongWritable(); 812 while(dataReader.next(key, value)) { 813 cnt++; 814 if (cnt % indexInterval == 0) { 815 position.set(pos); 816 if (!dryrun) indexWriter.append(key, position); 817 } 818 pos = dataReader.getPosition(); 819 } 820 } catch(Throwable t) { 821 // truncated data file. swallow it. 822 } 823 dataReader.close(); 824 if (!dryrun) indexWriter.close(); 825 return cnt; 826 } 827 828 /** 829 * Class to merge multiple MapFiles of same Key and Value types to one MapFile 830 */ 831 public static class Merger { 832 private Configuration conf; 833 private WritableComparator comparator = null; 834 private Reader[] inReaders; 835 private Writer outWriter; 836 private Class<Writable> valueClass = null; 837 private Class<WritableComparable> keyClass = null; 838 839 public Merger(Configuration conf) throws IOException { 840 this.conf = conf; 841 } 842 843 /** 844 * Merge multiple MapFiles to one Mapfile 845 * 846 * @param inMapFiles 847 * @param outMapFile 848 * @throws IOException 849 */ 850 public void merge(Path[] inMapFiles, boolean deleteInputs, 851 Path outMapFile) throws IOException { 852 try { 853 open(inMapFiles, outMapFile); 854 mergePass(); 855 } finally { 856 close(); 857 } 858 if (deleteInputs) { 859 for (int i = 0; i < inMapFiles.length; i++) { 860 Path path = inMapFiles[i]; 861 delete(path.getFileSystem(conf), path.toString()); 862 } 863 } 864 } 865 866 /* 867 * Open all input files for reading and verify the key and value types. And 868 * open Output file for writing 869 */ 870 @SuppressWarnings("unchecked") 871 private void open(Path[] inMapFiles, Path outMapFile) throws IOException { 872 inReaders = new Reader[inMapFiles.length]; 873 for (int i = 0; i < inMapFiles.length; i++) { 874 Reader reader = new Reader(inMapFiles[i], conf); 875 if (keyClass == null || valueClass == null) { 876 keyClass = (Class<WritableComparable>) reader.getKeyClass(); 877 valueClass = (Class<Writable>) reader.getValueClass(); 878 } else if (keyClass != reader.getKeyClass() 879 || valueClass != reader.getValueClass()) { 880 throw new HadoopIllegalArgumentException( 881 "Input files cannot be merged as they" 882 + " have different Key and Value classes"); 883 } 884 inReaders[i] = reader; 885 } 886 887 if (comparator == null) { 888 Class<? extends WritableComparable> cls; 889 cls = keyClass.asSubclass(WritableComparable.class); 890 this.comparator = WritableComparator.get(cls, conf); 891 } else if (comparator.getKeyClass() != keyClass) { 892 throw new HadoopIllegalArgumentException( 893 "Input files cannot be merged as they" 894 + " have different Key class compared to" 895 + " specified comparator"); 896 } 897 898 outWriter = new MapFile.Writer(conf, outMapFile, 899 MapFile.Writer.keyClass(keyClass), 900 MapFile.Writer.valueClass(valueClass)); 901 } 902 903 /** 904 * Merge all input files to output map file.<br> 905 * 1. Read first key/value from all input files to keys/values array. <br> 906 * 2. Select the least key and corresponding value. <br> 907 * 3. Write the selected key and value to output file. <br> 908 * 4. Replace the already written key/value in keys/values arrays with the 909 * next key/value from the selected input <br> 910 * 5. Repeat step 2-4 till all keys are read. <br> 911 */ 912 private void mergePass() throws IOException { 913 // re-usable array 914 WritableComparable[] keys = new WritableComparable[inReaders.length]; 915 Writable[] values = new Writable[inReaders.length]; 916 // Read first key/value from all inputs 917 for (int i = 0; i < inReaders.length; i++) { 918 keys[i] = ReflectionUtils.newInstance(keyClass, null); 919 values[i] = ReflectionUtils.newInstance(valueClass, null); 920 if (!inReaders[i].next(keys[i], values[i])) { 921 // Handle empty files 922 keys[i] = null; 923 values[i] = null; 924 } 925 } 926 927 do { 928 int currentEntry = -1; 929 WritableComparable currentKey = null; 930 Writable currentValue = null; 931 for (int i = 0; i < keys.length; i++) { 932 if (keys[i] == null) { 933 // Skip Readers reached EOF 934 continue; 935 } 936 if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) { 937 currentEntry = i; 938 currentKey = keys[i]; 939 currentValue = values[i]; 940 } 941 } 942 if (currentKey == null) { 943 // Merge Complete 944 break; 945 } 946 // Write the selected key/value to merge stream 947 outWriter.append(currentKey, currentValue); 948 // Replace the already written key/value in keys/values arrays with the 949 // next key/value from the selected input 950 if (!inReaders[currentEntry].next(keys[currentEntry], 951 values[currentEntry])) { 952 // EOF for this file 953 keys[currentEntry] = null; 954 values[currentEntry] = null; 955 } 956 } while (true); 957 } 958 959 private void close() throws IOException { 960 for (int i = 0; i < inReaders.length; i++) { 961 IOUtils.closeStream(inReaders[i]); 962 inReaders[i] = null; 963 } 964 if (outWriter != null) { 965 outWriter.close(); 966 outWriter = null; 967 } 968 } 969 } 970 971 public static void main(String[] args) throws Exception { 972 String usage = "Usage: MapFile inFile outFile"; 973 974 if (args.length != 2) { 975 System.err.println(usage); 976 System.exit(-1); 977 } 978 979 String in = args[0]; 980 String out = args[1]; 981 982 Configuration conf = new Configuration(); 983 FileSystem fs = FileSystem.getLocal(conf); 984 MapFile.Reader reader = null; 985 MapFile.Writer writer = null; 986 try { 987 reader = new MapFile.Reader(fs, in, conf); 988 writer = 989 new MapFile.Writer(conf, fs, out, 990 reader.getKeyClass().asSubclass(WritableComparable.class), 991 reader.getValueClass()); 992 993 WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass() 994 .asSubclass(WritableComparable.class), conf); 995 Writable value = ReflectionUtils.newInstance(reader.getValueClass() 996 .asSubclass(Writable.class), conf); 997 998 while (reader.next(key, value)) // copy all entries 999 writer.append(key, value); 1000 } finally { 1001 IOUtils.cleanup(LOG, writer, reader); 1002 } 1003 } 1004}