001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Arrays; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.HadoopIllegalArgumentException; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.io.IOUtils; 035import org.apache.hadoop.io.SequenceFile.CompressionType; 036import org.apache.hadoop.io.compress.CompressionCodec; 037import org.apache.hadoop.util.Options; 038import org.apache.hadoop.util.Progressable; 039import org.apache.hadoop.util.ReflectionUtils; 040 041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_DEFAULT; 042import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAP_INDEX_SKIP_KEY; 043 044/** A file-based map from keys to values. 045 * 046 * <p>A map is a directory containing two files, the <code>data</code> file, 047 * containing all keys and values in the map, and a smaller <code>index</code> 048 * file, containing a fraction of the keys. The fraction is determined by 049 * {@link Writer#getIndexInterval()}. 050 * 051 * <p>The index file is read entirely into memory. Thus key implementations 052 * should try to keep themselves small. 053 * 054 * <p>Map files are created by adding entries in-order. To maintain a large 055 * database, perform updates by copying the previous version of a database and 056 * merging in a sorted change list, to create a new version of the database in 057 * a new file. Sorting large change lists can be done with {@link 058 * SequenceFile.Sorter}. 059 */ 060@InterfaceAudience.Public 061@InterfaceStability.Stable 062public class MapFile { 063 private static final Log LOG = LogFactory.getLog(MapFile.class); 064 065 /** The name of the index file. */ 066 public static final String INDEX_FILE_NAME = "index"; 067 068 /** The name of the data file. */ 069 public static final String DATA_FILE_NAME = "data"; 070 071 protected MapFile() {} // no public ctor 072 073 /** Writes a new map. */ 074 public static class Writer implements java.io.Closeable { 075 private SequenceFile.Writer data; 076 private SequenceFile.Writer index; 077 078 final private static String INDEX_INTERVAL = "io.map.index.interval"; 079 private int indexInterval = 128; 080 081 private long size; 082 private LongWritable position = new LongWritable(); 083 084 // the following fields are used only for checking key order 085 private WritableComparator comparator; 086 private DataInputBuffer inBuf = new DataInputBuffer(); 087 private DataOutputBuffer outBuf = new DataOutputBuffer(); 088 private WritableComparable lastKey; 089 090 /** What's the position (in bytes) we wrote when we got the last index */ 091 private long lastIndexPos = -1; 092 093 /** 094 * What was size when we last wrote an index. Set to MIN_VALUE to ensure that 095 * we have an index at position zero -- midKey will throw an exception if this 096 * is not the case 097 */ 098 private long lastIndexKeyCount = Long.MIN_VALUE; 099 100 101 /** Create the named map for keys of the named class. 102 * @deprecated Use Writer(Configuration, Path, Option...) instead. 103 */ 104 @Deprecated 105 public Writer(Configuration conf, FileSystem fs, String dirName, 106 Class<? extends WritableComparable> keyClass, 107 Class valClass) throws IOException { 108 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 109 } 110 111 /** Create the named map for keys of the named class. 112 * @deprecated Use Writer(Configuration, Path, Option...) instead. 113 */ 114 @Deprecated 115 public Writer(Configuration conf, FileSystem fs, String dirName, 116 Class<? extends WritableComparable> keyClass, Class valClass, 117 CompressionType compress, 118 Progressable progress) throws IOException { 119 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 120 compression(compress), progressable(progress)); 121 } 122 123 /** Create the named map for keys of the named class. 124 * @deprecated Use Writer(Configuration, Path, Option...) instead. 125 */ 126 @Deprecated 127 public Writer(Configuration conf, FileSystem fs, String dirName, 128 Class<? extends WritableComparable> keyClass, Class valClass, 129 CompressionType compress, CompressionCodec codec, 130 Progressable progress) throws IOException { 131 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 132 compression(compress, codec), progressable(progress)); 133 } 134 135 /** Create the named map for keys of the named class. 136 * @deprecated Use Writer(Configuration, Path, Option...) instead. 137 */ 138 @Deprecated 139 public Writer(Configuration conf, FileSystem fs, String dirName, 140 Class<? extends WritableComparable> keyClass, Class valClass, 141 CompressionType compress) throws IOException { 142 this(conf, new Path(dirName), keyClass(keyClass), 143 valueClass(valClass), compression(compress)); 144 } 145 146 /** Create the named map using the named key comparator. 147 * @deprecated Use Writer(Configuration, Path, Option...) instead. 148 */ 149 @Deprecated 150 public Writer(Configuration conf, FileSystem fs, String dirName, 151 WritableComparator comparator, Class valClass 152 ) throws IOException { 153 this(conf, new Path(dirName), comparator(comparator), 154 valueClass(valClass)); 155 } 156 157 /** Create the named map using the named key comparator. 158 * @deprecated Use Writer(Configuration, Path, Option...) instead. 159 */ 160 @Deprecated 161 public Writer(Configuration conf, FileSystem fs, String dirName, 162 WritableComparator comparator, Class valClass, 163 SequenceFile.CompressionType compress) throws IOException { 164 this(conf, new Path(dirName), comparator(comparator), 165 valueClass(valClass), compression(compress)); 166 } 167 168 /** Create the named map using the named key comparator. 169 * @deprecated Use Writer(Configuration, Path, Option...)} instead. 170 */ 171 @Deprecated 172 public Writer(Configuration conf, FileSystem fs, String dirName, 173 WritableComparator comparator, Class valClass, 174 SequenceFile.CompressionType compress, 175 Progressable progress) throws IOException { 176 this(conf, new Path(dirName), comparator(comparator), 177 valueClass(valClass), compression(compress), 178 progressable(progress)); 179 } 180 181 /** Create the named map using the named key comparator. 182 * @deprecated Use Writer(Configuration, Path, Option...) instead. 183 */ 184 @Deprecated 185 public Writer(Configuration conf, FileSystem fs, String dirName, 186 WritableComparator comparator, Class valClass, 187 SequenceFile.CompressionType compress, CompressionCodec codec, 188 Progressable progress) throws IOException { 189 this(conf, new Path(dirName), comparator(comparator), 190 valueClass(valClass), compression(compress, codec), 191 progressable(progress)); 192 } 193 194 // our options are a superset of sequence file writer options 195 public static interface Option extends SequenceFile.Writer.Option { } 196 197 private static class KeyClassOption extends Options.ClassOption 198 implements Option { 199 KeyClassOption(Class<?> value) { 200 super(value); 201 } 202 } 203 204 private static class ComparatorOption implements Option { 205 private final WritableComparator value; 206 ComparatorOption(WritableComparator value) { 207 this.value = value; 208 } 209 WritableComparator getValue() { 210 return value; 211 } 212 } 213 214 public static Option keyClass(Class<? extends WritableComparable> value) { 215 return new KeyClassOption(value); 216 } 217 218 public static Option comparator(WritableComparator value) { 219 return new ComparatorOption(value); 220 } 221 222 public static SequenceFile.Writer.Option valueClass(Class<?> value) { 223 return SequenceFile.Writer.valueClass(value); 224 } 225 226 public static 227 SequenceFile.Writer.Option compression(CompressionType type) { 228 return SequenceFile.Writer.compression(type); 229 } 230 231 public static 232 SequenceFile.Writer.Option compression(CompressionType type, 233 CompressionCodec codec) { 234 return SequenceFile.Writer.compression(type, codec); 235 } 236 237 public static SequenceFile.Writer.Option progressable(Progressable value) { 238 return SequenceFile.Writer.progressable(value); 239 } 240 241 @SuppressWarnings("unchecked") 242 public Writer(Configuration conf, 243 Path dirName, 244 SequenceFile.Writer.Option... opts 245 ) throws IOException { 246 KeyClassOption keyClassOption = 247 Options.getOption(KeyClassOption.class, opts); 248 ComparatorOption comparatorOption = 249 Options.getOption(ComparatorOption.class, opts); 250 if ((keyClassOption == null) == (comparatorOption == null)) { 251 throw new IllegalArgumentException("key class or comparator option " 252 + "must be set"); 253 } 254 this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); 255 256 Class<? extends WritableComparable> keyClass; 257 if (keyClassOption == null) { 258 this.comparator = comparatorOption.getValue(); 259 keyClass = comparator.getKeyClass(); 260 } else { 261 keyClass= 262 (Class<? extends WritableComparable>) keyClassOption.getValue(); 263 this.comparator = WritableComparator.get(keyClass, conf); 264 } 265 this.lastKey = comparator.newKey(); 266 FileSystem fs = dirName.getFileSystem(conf); 267 268 if (!fs.mkdirs(dirName)) { 269 throw new IOException("Mkdirs failed to create directory " + dirName); 270 } 271 Path dataFile = new Path(dirName, DATA_FILE_NAME); 272 Path indexFile = new Path(dirName, INDEX_FILE_NAME); 273 274 SequenceFile.Writer.Option[] dataOptions = 275 Options.prependOptions(opts, 276 SequenceFile.Writer.file(dataFile), 277 SequenceFile.Writer.keyClass(keyClass)); 278 this.data = SequenceFile.createWriter(conf, dataOptions); 279 280 SequenceFile.Writer.Option[] indexOptions = 281 Options.prependOptions(opts, SequenceFile.Writer.file(indexFile), 282 SequenceFile.Writer.keyClass(keyClass), 283 SequenceFile.Writer.valueClass(LongWritable.class), 284 SequenceFile.Writer.compression(CompressionType.BLOCK)); 285 this.index = SequenceFile.createWriter(conf, indexOptions); 286 } 287 288 /** The number of entries that are added before an index entry is added.*/ 289 public int getIndexInterval() { return indexInterval; } 290 291 /** Sets the index interval. 292 * @see #getIndexInterval() 293 */ 294 public void setIndexInterval(int interval) { indexInterval = interval; } 295 296 /** Sets the index interval and stores it in conf 297 * @see #getIndexInterval() 298 */ 299 public static void setIndexInterval(Configuration conf, int interval) { 300 conf.setInt(INDEX_INTERVAL, interval); 301 } 302 303 /** Close the map. */ 304 @Override 305 public synchronized void close() throws IOException { 306 data.close(); 307 index.close(); 308 } 309 310 /** Append a key/value pair to the map. The key must be greater or equal 311 * to the previous key added to the map. */ 312 public synchronized void append(WritableComparable key, Writable val) 313 throws IOException { 314 315 checkKey(key); 316 317 long pos = data.getLength(); 318 // Only write an index if we've changed positions. In a block compressed 319 // file, this means we write an entry at the start of each block 320 if (size >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { 321 position.set(pos); // point to current eof 322 index.append(key, position); 323 lastIndexPos = pos; 324 lastIndexKeyCount = size; 325 } 326 327 data.append(key, val); // append key/value to data 328 size++; 329 } 330 331 private void checkKey(WritableComparable key) throws IOException { 332 // check that keys are well-ordered 333 if (size != 0 && comparator.compare(lastKey, key) > 0) 334 throw new IOException("key out of order: "+key+" after "+lastKey); 335 336 // update lastKey with a copy of key by writing and reading 337 outBuf.reset(); 338 key.write(outBuf); // write new key 339 340 inBuf.reset(outBuf.getData(), outBuf.getLength()); 341 lastKey.readFields(inBuf); // read into lastKey 342 } 343 344 } 345 346 /** Provide access to an existing map. */ 347 public static class Reader implements java.io.Closeable { 348 349 /** Number of index entries to skip between each entry. Zero by default. 350 * Setting this to values larger than zero can facilitate opening large map 351 * files using less memory. */ 352 private int INDEX_SKIP = 0; 353 354 private WritableComparator comparator; 355 356 private WritableComparable nextKey; 357 private long seekPosition = -1; 358 private int seekIndex = -1; 359 private long firstPosition; 360 361 // the data, on disk 362 private SequenceFile.Reader data; 363 private SequenceFile.Reader index; 364 365 // whether the index Reader was closed 366 private boolean indexClosed = false; 367 368 // the index, in memory 369 private int count = -1; 370 private WritableComparable[] keys; 371 private long[] positions; 372 373 /** Returns the class of keys in this file. */ 374 public Class<?> getKeyClass() { return data.getKeyClass(); } 375 376 /** Returns the class of values in this file. */ 377 public Class<?> getValueClass() { return data.getValueClass(); } 378 379 public static interface Option extends SequenceFile.Reader.Option {} 380 381 public static Option comparator(WritableComparator value) { 382 return new ComparatorOption(value); 383 } 384 385 static class ComparatorOption implements Option { 386 private final WritableComparator value; 387 ComparatorOption(WritableComparator value) { 388 this.value = value; 389 } 390 WritableComparator getValue() { 391 return value; 392 } 393 } 394 395 public Reader(Path dir, Configuration conf, 396 SequenceFile.Reader.Option... opts) throws IOException { 397 ComparatorOption comparatorOption = 398 Options.getOption(ComparatorOption.class, opts); 399 WritableComparator comparator = 400 comparatorOption == null ? null : comparatorOption.getValue(); 401 INDEX_SKIP = conf.getInt( 402 IO_MAP_INDEX_SKIP_KEY, IO_MAP_INDEX_SKIP_DEFAULT); 403 open(dir, comparator, conf, opts); 404 } 405 406 /** Construct a map reader for the named map. 407 * @deprecated 408 */ 409 @Deprecated 410 public Reader(FileSystem fs, String dirName, 411 Configuration conf) throws IOException { 412 this(new Path(dirName), conf); 413 } 414 415 /** Construct a map reader for the named map using the named comparator. 416 * @deprecated 417 */ 418 @Deprecated 419 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 420 Configuration conf) throws IOException { 421 this(new Path(dirName), conf, comparator(comparator)); 422 } 423 424 protected synchronized void open(Path dir, 425 WritableComparator comparator, 426 Configuration conf, 427 SequenceFile.Reader.Option... options 428 ) throws IOException { 429 Path dataFile = new Path(dir, DATA_FILE_NAME); 430 Path indexFile = new Path(dir, INDEX_FILE_NAME); 431 432 // open the data 433 this.data = createDataFileReader(dataFile, conf, options); 434 this.firstPosition = data.getPosition(); 435 436 if (comparator == null) { 437 Class<? extends WritableComparable> cls; 438 cls = data.getKeyClass().asSubclass(WritableComparable.class); 439 this.comparator = WritableComparator.get(cls, conf); 440 } else { 441 this.comparator = comparator; 442 } 443 444 // open the index 445 SequenceFile.Reader.Option[] indexOptions = 446 Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); 447 this.index = new SequenceFile.Reader(conf, indexOptions); 448 } 449 450 /** 451 * Override this method to specialize the type of 452 * {@link SequenceFile.Reader} returned. 453 */ 454 protected SequenceFile.Reader 455 createDataFileReader(Path dataFile, Configuration conf, 456 SequenceFile.Reader.Option... options 457 ) throws IOException { 458 SequenceFile.Reader.Option[] newOptions = 459 Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); 460 return new SequenceFile.Reader(conf, newOptions); 461 } 462 463 private void readIndex() throws IOException { 464 // read the index entirely into memory 465 if (this.keys != null) 466 return; 467 this.count = 0; 468 this.positions = new long[1024]; 469 470 try { 471 int skip = INDEX_SKIP; 472 LongWritable position = new LongWritable(); 473 WritableComparable lastKey = null; 474 long lastIndex = -1; 475 ArrayList<WritableComparable> keyBuilder = new ArrayList<WritableComparable>(1024); 476 while (true) { 477 WritableComparable k = comparator.newKey(); 478 479 if (!index.next(k, position)) 480 break; 481 482 // check order to make sure comparator is compatible 483 if (lastKey != null && comparator.compare(lastKey, k) > 0) 484 throw new IOException("key out of order: "+k+" after "+lastKey); 485 lastKey = k; 486 if (skip > 0) { 487 skip--; 488 continue; // skip this entry 489 } else { 490 skip = INDEX_SKIP; // reset skip 491 } 492 493 // don't read an index that is the same as the previous one. Block 494 // compressed map files used to do this (multiple entries would point 495 // at the same block) 496 if (position.get() == lastIndex) 497 continue; 498 499 if (count == positions.length) { 500 positions = Arrays.copyOf(positions, positions.length * 2); 501 } 502 503 keyBuilder.add(k); 504 positions[count] = position.get(); 505 count++; 506 } 507 508 this.keys = keyBuilder.toArray(new WritableComparable[count]); 509 positions = Arrays.copyOf(positions, count); 510 } catch (EOFException e) { 511 LOG.warn("Unexpected EOF reading " + index + 512 " at entry #" + count + ". Ignoring."); 513 } finally { 514 indexClosed = true; 515 index.close(); 516 } 517 } 518 519 /** Re-positions the reader before its first key. */ 520 public synchronized void reset() throws IOException { 521 data.seek(firstPosition); 522 } 523 524 /** Get the key at approximately the middle of the file. Or null if the 525 * file is empty. 526 */ 527 public synchronized WritableComparable midKey() throws IOException { 528 529 readIndex(); 530 if (count == 0) { 531 return null; 532 } 533 534 return keys[(count - 1) / 2]; 535 } 536 537 /** Reads the final key from the file. 538 * 539 * @param key key to read into 540 */ 541 public synchronized void finalKey(WritableComparable key) 542 throws IOException { 543 544 long originalPosition = data.getPosition(); // save position 545 try { 546 readIndex(); // make sure index is valid 547 if (count > 0) { 548 data.seek(positions[count-1]); // skip to last indexed entry 549 } else { 550 reset(); // start at the beginning 551 } 552 while (data.next(key)) {} // scan to eof 553 554 } finally { 555 data.seek(originalPosition); // restore position 556 } 557 } 558 559 /** Positions the reader at the named key, or if none such exists, at the 560 * first entry after the named key. Returns true iff the named key exists 561 * in this map. 562 */ 563 public synchronized boolean seek(WritableComparable key) throws IOException { 564 return seekInternal(key) == 0; 565 } 566 567 /** 568 * Positions the reader at the named key, or if none such exists, at the 569 * first entry after the named key. 570 * 571 * @return 0 - exact match found 572 * < 0 - positioned at next record 573 * 1 - no more records in file 574 */ 575 private synchronized int seekInternal(WritableComparable key) 576 throws IOException { 577 return seekInternal(key, false); 578 } 579 580 /** 581 * Positions the reader at the named key, or if none such exists, at the 582 * key that falls just before or just after dependent on how the 583 * <code>before</code> parameter is set. 584 * 585 * @param before - IF true, and <code>key</code> does not exist, position 586 * file at entry that falls just before <code>key</code>. Otherwise, 587 * position file at record that sorts just after. 588 * @return 0 - exact match found 589 * < 0 - positioned at next record 590 * 1 - no more records in file 591 */ 592 private synchronized int seekInternal(WritableComparable key, 593 final boolean before) 594 throws IOException { 595 readIndex(); // make sure index is read 596 597 if (seekIndex != -1 // seeked before 598 && seekIndex+1 < count 599 && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed 600 && comparator.compare(key, nextKey) 601 >= 0) { // but after last seeked 602 // do nothing 603 } else { 604 seekIndex = binarySearch(key); 605 if (seekIndex < 0) // decode insertion point 606 seekIndex = -seekIndex-2; 607 608 if (seekIndex == -1) // belongs before first entry 609 seekPosition = firstPosition; // use beginning of file 610 else 611 seekPosition = positions[seekIndex]; // else use index 612 } 613 data.seek(seekPosition); 614 615 if (nextKey == null) 616 nextKey = comparator.newKey(); 617 618 // If we're looking for the key before, we need to keep track 619 // of the position we got the current key as well as the position 620 // of the key before it. 621 long prevPosition = -1; 622 long curPosition = seekPosition; 623 624 while (data.next(nextKey)) { 625 int c = comparator.compare(key, nextKey); 626 if (c <= 0) { // at or beyond desired 627 if (before && c != 0) { 628 if (prevPosition == -1) { 629 // We're on the first record of this index block 630 // and we've already passed the search key. Therefore 631 // we must be at the beginning of the file, so seek 632 // to the beginning of this block and return c 633 data.seek(curPosition); 634 } else { 635 // We have a previous record to back up to 636 data.seek(prevPosition); 637 data.next(nextKey); 638 // now that we've rewound, the search key must be greater than this key 639 return 1; 640 } 641 } 642 return c; 643 } 644 if (before) { 645 prevPosition = curPosition; 646 curPosition = data.getPosition(); 647 } 648 } 649 650 return 1; 651 } 652 653 private int binarySearch(WritableComparable key) { 654 int low = 0; 655 int high = count-1; 656 657 while (low <= high) { 658 int mid = (low + high) >>> 1; 659 WritableComparable midVal = keys[mid]; 660 int cmp = comparator.compare(midVal, key); 661 662 if (cmp < 0) 663 low = mid + 1; 664 else if (cmp > 0) 665 high = mid - 1; 666 else 667 return mid; // key found 668 } 669 return -(low + 1); // key not found. 670 } 671 672 /** Read the next key/value pair in the map into <code>key</code> and 673 * <code>val</code>. Returns true if such a pair exists and false when at 674 * the end of the map */ 675 public synchronized boolean next(WritableComparable key, Writable val) 676 throws IOException { 677 return data.next(key, val); 678 } 679 680 /** Return the value for the named key, or null if none exists. */ 681 public synchronized Writable get(WritableComparable key, Writable val) 682 throws IOException { 683 if (seek(key)) { 684 data.getCurrentValue(val); 685 return val; 686 } else 687 return null; 688 } 689 690 /** 691 * Finds the record that is the closest match to the specified key. 692 * Returns <code>key</code> or if it does not exist, at the first entry 693 * after the named key. 694 * 695- * @param key - key that we're trying to find 696- * @param val - data value if key is found 697- * @return - the key that was the closest match or null if eof. 698 */ 699 public synchronized WritableComparable getClosest(WritableComparable key, 700 Writable val) 701 throws IOException { 702 return getClosest(key, val, false); 703 } 704 705 /** 706 * Finds the record that is the closest match to the specified key. 707 * 708 * @param key - key that we're trying to find 709 * @param val - data value if key is found 710 * @param before - IF true, and <code>key</code> does not exist, return 711 * the first entry that falls just before the <code>key</code>. Otherwise, 712 * return the record that sorts just after. 713 * @return - the key that was the closest match or null if eof. 714 */ 715 public synchronized WritableComparable getClosest(WritableComparable key, 716 Writable val, final boolean before) 717 throws IOException { 718 719 int c = seekInternal(key, before); 720 721 // If we didn't get an exact match, and we ended up in the wrong 722 // direction relative to the query key, return null since we 723 // must be at the beginning or end of the file. 724 if ((!before && c > 0) || 725 (before && c < 0)) { 726 return null; 727 } 728 729 data.getCurrentValue(val); 730 return nextKey; 731 } 732 733 /** Close the map. */ 734 @Override 735 public synchronized void close() throws IOException { 736 if (!indexClosed) { 737 index.close(); 738 } 739 data.close(); 740 } 741 742 } 743 744 /** Renames an existing map directory. */ 745 public static void rename(FileSystem fs, String oldName, String newName) 746 throws IOException { 747 Path oldDir = new Path(oldName); 748 Path newDir = new Path(newName); 749 if (!fs.rename(oldDir, newDir)) { 750 throw new IOException("Could not rename " + oldDir + " to " + newDir); 751 } 752 } 753 754 /** Deletes the named map file. */ 755 public static void delete(FileSystem fs, String name) throws IOException { 756 Path dir = new Path(name); 757 Path data = new Path(dir, DATA_FILE_NAME); 758 Path index = new Path(dir, INDEX_FILE_NAME); 759 760 fs.delete(data, true); 761 fs.delete(index, true); 762 fs.delete(dir, true); 763 } 764 765 /** 766 * This method attempts to fix a corrupt MapFile by re-creating its index. 767 * @param fs filesystem 768 * @param dir directory containing the MapFile data and index 769 * @param keyClass key class (has to be a subclass of Writable) 770 * @param valueClass value class (has to be a subclass of Writable) 771 * @param dryrun do not perform any changes, just report what needs to be done 772 * @return number of valid entries in this MapFile, or -1 if no fixing was needed 773 * @throws Exception 774 */ 775 public static long fix(FileSystem fs, Path dir, 776 Class<? extends Writable> keyClass, 777 Class<? extends Writable> valueClass, boolean dryrun, 778 Configuration conf) throws Exception { 779 String dr = (dryrun ? "[DRY RUN ] " : ""); 780 Path data = new Path(dir, DATA_FILE_NAME); 781 Path index = new Path(dir, INDEX_FILE_NAME); 782 int indexInterval = conf.getInt(Writer.INDEX_INTERVAL, 128); 783 if (!fs.exists(data)) { 784 // there's nothing we can do to fix this! 785 throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this."); 786 } 787 if (fs.exists(index)) { 788 // no fixing needed 789 return -1; 790 } 791 SequenceFile.Reader dataReader = 792 new SequenceFile.Reader(conf, SequenceFile.Reader.file(data)); 793 if (!dataReader.getKeyClass().equals(keyClass)) { 794 throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + 795 ", got " + dataReader.getKeyClass().getName()); 796 } 797 if (!dataReader.getValueClass().equals(valueClass)) { 798 throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() + 799 ", got " + dataReader.getValueClass().getName()); 800 } 801 long cnt = 0L; 802 Writable key = ReflectionUtils.newInstance(keyClass, conf); 803 Writable value = ReflectionUtils.newInstance(valueClass, conf); 804 SequenceFile.Writer indexWriter = null; 805 if (!dryrun) { 806 indexWriter = 807 SequenceFile.createWriter(conf, 808 SequenceFile.Writer.file(index), 809 SequenceFile.Writer.keyClass(keyClass), 810 SequenceFile.Writer.valueClass 811 (LongWritable.class)); 812 } 813 try { 814 long pos = 0L; 815 LongWritable position = new LongWritable(); 816 while(dataReader.next(key, value)) { 817 cnt++; 818 if (cnt % indexInterval == 0) { 819 position.set(pos); 820 if (!dryrun) indexWriter.append(key, position); 821 } 822 pos = dataReader.getPosition(); 823 } 824 } catch(Throwable t) { 825 // truncated data file. swallow it. 826 } 827 dataReader.close(); 828 if (!dryrun) indexWriter.close(); 829 return cnt; 830 } 831 832 /** 833 * Class to merge multiple MapFiles of same Key and Value types to one MapFile 834 */ 835 public static class Merger { 836 private Configuration conf; 837 private WritableComparator comparator = null; 838 private Reader[] inReaders; 839 private Writer outWriter; 840 private Class<Writable> valueClass = null; 841 private Class<WritableComparable> keyClass = null; 842 843 public Merger(Configuration conf) throws IOException { 844 this.conf = conf; 845 } 846 847 /** 848 * Merge multiple MapFiles to one Mapfile 849 * 850 * @param inMapFiles 851 * @param outMapFile 852 * @throws IOException 853 */ 854 public void merge(Path[] inMapFiles, boolean deleteInputs, 855 Path outMapFile) throws IOException { 856 try { 857 open(inMapFiles, outMapFile); 858 mergePass(); 859 } finally { 860 close(); 861 } 862 if (deleteInputs) { 863 for (int i = 0; i < inMapFiles.length; i++) { 864 Path path = inMapFiles[i]; 865 delete(path.getFileSystem(conf), path.toString()); 866 } 867 } 868 } 869 870 /* 871 * Open all input files for reading and verify the key and value types. And 872 * open Output file for writing 873 */ 874 @SuppressWarnings("unchecked") 875 private void open(Path[] inMapFiles, Path outMapFile) throws IOException { 876 inReaders = new Reader[inMapFiles.length]; 877 for (int i = 0; i < inMapFiles.length; i++) { 878 Reader reader = new Reader(inMapFiles[i], conf); 879 if (keyClass == null || valueClass == null) { 880 keyClass = (Class<WritableComparable>) reader.getKeyClass(); 881 valueClass = (Class<Writable>) reader.getValueClass(); 882 } else if (keyClass != reader.getKeyClass() 883 || valueClass != reader.getValueClass()) { 884 throw new HadoopIllegalArgumentException( 885 "Input files cannot be merged as they" 886 + " have different Key and Value classes"); 887 } 888 inReaders[i] = reader; 889 } 890 891 if (comparator == null) { 892 Class<? extends WritableComparable> cls; 893 cls = keyClass.asSubclass(WritableComparable.class); 894 this.comparator = WritableComparator.get(cls, conf); 895 } else if (comparator.getKeyClass() != keyClass) { 896 throw new HadoopIllegalArgumentException( 897 "Input files cannot be merged as they" 898 + " have different Key class compared to" 899 + " specified comparator"); 900 } 901 902 outWriter = new MapFile.Writer(conf, outMapFile, 903 MapFile.Writer.keyClass(keyClass), 904 MapFile.Writer.valueClass(valueClass)); 905 } 906 907 /** 908 * Merge all input files to output map file.<br> 909 * 1. Read first key/value from all input files to keys/values array. <br> 910 * 2. Select the least key and corresponding value. <br> 911 * 3. Write the selected key and value to output file. <br> 912 * 4. Replace the already written key/value in keys/values arrays with the 913 * next key/value from the selected input <br> 914 * 5. Repeat step 2-4 till all keys are read. <br> 915 */ 916 private void mergePass() throws IOException { 917 // re-usable array 918 WritableComparable[] keys = new WritableComparable[inReaders.length]; 919 Writable[] values = new Writable[inReaders.length]; 920 // Read first key/value from all inputs 921 for (int i = 0; i < inReaders.length; i++) { 922 keys[i] = ReflectionUtils.newInstance(keyClass, null); 923 values[i] = ReflectionUtils.newInstance(valueClass, null); 924 if (!inReaders[i].next(keys[i], values[i])) { 925 // Handle empty files 926 keys[i] = null; 927 values[i] = null; 928 } 929 } 930 931 do { 932 int currentEntry = -1; 933 WritableComparable currentKey = null; 934 Writable currentValue = null; 935 for (int i = 0; i < keys.length; i++) { 936 if (keys[i] == null) { 937 // Skip Readers reached EOF 938 continue; 939 } 940 if (currentKey == null || comparator.compare(currentKey, keys[i]) > 0) { 941 currentEntry = i; 942 currentKey = keys[i]; 943 currentValue = values[i]; 944 } 945 } 946 if (currentKey == null) { 947 // Merge Complete 948 break; 949 } 950 // Write the selected key/value to merge stream 951 outWriter.append(currentKey, currentValue); 952 // Replace the already written key/value in keys/values arrays with the 953 // next key/value from the selected input 954 if (!inReaders[currentEntry].next(keys[currentEntry], 955 values[currentEntry])) { 956 // EOF for this file 957 keys[currentEntry] = null; 958 values[currentEntry] = null; 959 } 960 } while (true); 961 } 962 963 private void close() throws IOException { 964 for (int i = 0; i < inReaders.length; i++) { 965 IOUtils.closeStream(inReaders[i]); 966 inReaders[i] = null; 967 } 968 if (outWriter != null) { 969 outWriter.close(); 970 outWriter = null; 971 } 972 } 973 } 974 975 public static void main(String[] args) throws Exception { 976 String usage = "Usage: MapFile inFile outFile"; 977 978 if (args.length != 2) { 979 System.err.println(usage); 980 System.exit(-1); 981 } 982 983 String in = args[0]; 984 String out = args[1]; 985 986 Configuration conf = new Configuration(); 987 FileSystem fs = FileSystem.getLocal(conf); 988 MapFile.Reader reader = null; 989 MapFile.Writer writer = null; 990 try { 991 reader = new MapFile.Reader(fs, in, conf); 992 writer = 993 new MapFile.Writer(conf, fs, out, 994 reader.getKeyClass().asSubclass(WritableComparable.class), 995 reader.getValueClass()); 996 997 WritableComparable key = ReflectionUtils.newInstance(reader.getKeyClass() 998 .asSubclass(WritableComparable.class), conf); 999 Writable value = ReflectionUtils.newInstance(reader.getValueClass() 1000 .asSubclass(Writable.class), conf); 1001 1002 while (reader.next(key, value)) // copy all entries 1003 writer.append(key, value); 1004 } finally { 1005 IOUtils.cleanup(LOG, writer, reader); 1006 } 1007 } 1008}