001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Collections; 024 import java.util.Comparator; 025 import java.util.HashSet; 026 import java.util.IdentityHashMap; 027 import java.util.LinkedList; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.Set; 031 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 import org.apache.hadoop.classification.InterfaceAudience; 035 import org.apache.hadoop.classification.InterfaceStability; 036 import org.apache.hadoop.fs.BlockLocation; 037 import org.apache.hadoop.fs.FileStatus; 038 import org.apache.hadoop.fs.FileSystem; 039 import org.apache.hadoop.fs.LocatedFileStatus; 040 import org.apache.hadoop.fs.Path; 041 import org.apache.hadoop.fs.PathFilter; 042 import org.apache.hadoop.fs.RemoteIterator; 043 import org.apache.hadoop.mapreduce.security.TokenCache; 044 import org.apache.hadoop.net.NetworkTopology; 045 import org.apache.hadoop.net.Node; 046 import org.apache.hadoop.net.NodeBase; 047 import org.apache.hadoop.util.ReflectionUtils; 048 import org.apache.hadoop.util.StringUtils; 049 050 import com.google.common.base.Stopwatch; 051 import com.google.common.collect.Iterables; 052 053 /** 054 * A base class for file-based {@link InputFormat}. 055 * 056 * <p><code>FileInputFormat</code> is the base class for all file-based 057 * <code>InputFormat</code>s. This provides a generic implementation of 058 * {@link #getSplits(JobConf, int)}. 059 * Subclasses of <code>FileInputFormat</code> can also override the 060 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 061 * not split-up and are processed as a whole by {@link Mapper}s. 062 */ 063 @InterfaceAudience.Public 064 @InterfaceStability.Stable 065 public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 066 067 public static final Log LOG = 068 LogFactory.getLog(FileInputFormat.class); 069 070 @Deprecated 071 public static enum Counter { 072 BYTES_READ 073 } 074 075 public static final String NUM_INPUT_FILES = 076 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 077 078 public static final String INPUT_DIR_RECURSIVE = 079 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 080 081 082 private static final double SPLIT_SLOP = 1.1; // 10% slop 083 084 private long minSplitSize = 1; 085 private static final PathFilter hiddenFileFilter = new PathFilter(){ 086 public boolean accept(Path p){ 087 String name = p.getName(); 088 return !name.startsWith("_") && !name.startsWith("."); 089 } 090 }; 091 protected void setMinSplitSize(long minSplitSize) { 092 this.minSplitSize = minSplitSize; 093 } 094 095 /** 096 * Proxy PathFilter that accepts a path only if all filters given in the 097 * constructor do. Used by the listPaths() to apply the built-in 098 * hiddenFileFilter together with a user provided one (if any). 099 */ 100 private static class MultiPathFilter implements PathFilter { 101 private List<PathFilter> filters; 102 103 public MultiPathFilter(List<PathFilter> filters) { 104 this.filters = filters; 105 } 106 107 public boolean accept(Path path) { 108 for (PathFilter filter : filters) { 109 if (!filter.accept(path)) { 110 return false; 111 } 112 } 113 return true; 114 } 115 } 116 117 /** 118 * Is the given filename splitable? Usually, true, but if the file is 119 * stream compressed, it will not be. 120 * 121 * <code>FileInputFormat</code> implementations can override this and return 122 * <code>false</code> to ensure that individual input files are never split-up 123 * so that {@link Mapper}s process entire files. 124 * 125 * @param fs the file system that the file is on 126 * @param filename the file name to check 127 * @return is this file splitable? 128 */ 129 protected boolean isSplitable(FileSystem fs, Path filename) { 130 return true; 131 } 132 133 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 134 JobConf job, 135 Reporter reporter) 136 throws IOException; 137 138 /** 139 * Set a PathFilter to be applied to the input paths for the map-reduce job. 140 * 141 * @param filter the PathFilter class use for filtering the input paths. 142 */ 143 public static void setInputPathFilter(JobConf conf, 144 Class<? extends PathFilter> filter) { 145 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 146 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 147 } 148 149 /** 150 * Get a PathFilter instance of the filter set for the input paths. 151 * 152 * @return the PathFilter instance set for the job, NULL if none has been set. 153 */ 154 public static PathFilter getInputPathFilter(JobConf conf) { 155 Class<? extends PathFilter> filterClass = conf.getClass( 156 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 157 null, PathFilter.class); 158 return (filterClass != null) ? 159 ReflectionUtils.newInstance(filterClass, conf) : null; 160 } 161 162 /** 163 * Add files in the input path recursively into the results. 164 * @param result 165 * The List to store all files. 166 * @param fs 167 * The FileSystem. 168 * @param path 169 * The input path. 170 * @param inputFilter 171 * The input filter that can be used to filter files/dirs. 172 * @throws IOException 173 */ 174 protected void addInputPathRecursively(List<FileStatus> result, 175 FileSystem fs, Path path, PathFilter inputFilter) 176 throws IOException { 177 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 178 while (iter.hasNext()) { 179 LocatedFileStatus stat = iter.next(); 180 if (inputFilter.accept(stat.getPath())) { 181 if (stat.isDirectory()) { 182 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 183 } else { 184 result.add(stat); 185 } 186 } 187 } 188 } 189 190 /** List input directories. 191 * Subclasses may override to, e.g., select only files matching a regular 192 * expression. 193 * 194 * @param job the job to list input paths for 195 * @return array of FileStatus objects 196 * @throws IOException if zero items. 197 */ 198 protected FileStatus[] listStatus(JobConf job) throws IOException { 199 Path[] dirs = getInputPaths(job); 200 if (dirs.length == 0) { 201 throw new IOException("No input paths specified in job"); 202 } 203 204 // get tokens for all the required FileSystems.. 205 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 206 207 // Whether we need to recursive look into the directory structure 208 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 209 210 // creates a MultiPathFilter with the hiddenFileFilter and the 211 // user provided one (if any). 212 List<PathFilter> filters = new ArrayList<PathFilter>(); 213 filters.add(hiddenFileFilter); 214 PathFilter jobFilter = getInputPathFilter(job); 215 if (jobFilter != null) { 216 filters.add(jobFilter); 217 } 218 PathFilter inputFilter = new MultiPathFilter(filters); 219 220 FileStatus[] result; 221 int numThreads = job 222 .getInt( 223 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 224 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 225 226 Stopwatch sw = new Stopwatch().start(); 227 if (numThreads == 1) { 228 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 229 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 230 } else { 231 Iterable<FileStatus> locatedFiles = null; 232 try { 233 234 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 235 job, dirs, recursive, inputFilter, false); 236 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 237 } catch (InterruptedException e) { 238 throw new IOException("Interrupted while getting file statuses"); 239 } 240 result = Iterables.toArray(locatedFiles, FileStatus.class); 241 } 242 243 sw.stop(); 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); 246 } 247 LOG.info("Total input paths to process : " + result.length); 248 return result; 249 } 250 251 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 252 PathFilter inputFilter, boolean recursive) throws IOException { 253 List<FileStatus> result = new ArrayList<FileStatus>(); 254 List<IOException> errors = new ArrayList<IOException>(); 255 for (Path p: dirs) { 256 FileSystem fs = p.getFileSystem(job); 257 FileStatus[] matches = fs.globStatus(p, inputFilter); 258 if (matches == null) { 259 errors.add(new IOException("Input path does not exist: " + p)); 260 } else if (matches.length == 0) { 261 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 262 } else { 263 for (FileStatus globStat: matches) { 264 if (globStat.isDirectory()) { 265 RemoteIterator<LocatedFileStatus> iter = 266 fs.listLocatedStatus(globStat.getPath()); 267 while (iter.hasNext()) { 268 LocatedFileStatus stat = iter.next(); 269 if (inputFilter.accept(stat.getPath())) { 270 if (recursive && stat.isDirectory()) { 271 addInputPathRecursively(result, fs, stat.getPath(), 272 inputFilter); 273 } else { 274 result.add(stat); 275 } 276 } 277 } 278 } else { 279 result.add(globStat); 280 } 281 } 282 } 283 } 284 if (!errors.isEmpty()) { 285 throw new InvalidInputException(errors); 286 } 287 return result; 288 } 289 290 /** 291 * A factory that makes the split for this class. It can be overridden 292 * by sub-classes to make sub-types 293 */ 294 protected FileSplit makeSplit(Path file, long start, long length, 295 String[] hosts) { 296 return new FileSplit(file, start, length, hosts); 297 } 298 299 /** Splits files returned by {@link #listStatus(JobConf)} when 300 * they're too big.*/ 301 public InputSplit[] getSplits(JobConf job, int numSplits) 302 throws IOException { 303 Stopwatch sw = new Stopwatch().start(); 304 FileStatus[] files = listStatus(job); 305 306 // Save the number of input files for metrics/loadgen 307 job.setLong(NUM_INPUT_FILES, files.length); 308 long totalSize = 0; // compute total size 309 for (FileStatus file: files) { // check we have valid files 310 if (file.isDirectory()) { 311 throw new IOException("Not a file: "+ file.getPath()); 312 } 313 totalSize += file.getLen(); 314 } 315 316 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 317 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 318 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 319 320 // generate splits 321 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 322 NetworkTopology clusterMap = new NetworkTopology(); 323 for (FileStatus file: files) { 324 Path path = file.getPath(); 325 long length = file.getLen(); 326 if (length != 0) { 327 FileSystem fs = path.getFileSystem(job); 328 BlockLocation[] blkLocations; 329 if (file instanceof LocatedFileStatus) { 330 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 331 } else { 332 blkLocations = fs.getFileBlockLocations(file, 0, length); 333 } 334 if (isSplitable(fs, path)) { 335 long blockSize = file.getBlockSize(); 336 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 337 338 long bytesRemaining = length; 339 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 340 String[] splitHosts = getSplitHosts(blkLocations, 341 length-bytesRemaining, splitSize, clusterMap); 342 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 343 splitHosts)); 344 bytesRemaining -= splitSize; 345 } 346 347 if (bytesRemaining != 0) { 348 String[] splitHosts = getSplitHosts(blkLocations, length 349 - bytesRemaining, bytesRemaining, clusterMap); 350 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 351 splitHosts)); 352 } 353 } else { 354 String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); 355 splits.add(makeSplit(path, 0, length, splitHosts)); 356 } 357 } else { 358 //Create empty hosts array for zero length files 359 splits.add(makeSplit(path, 0, length, new String[0])); 360 } 361 } 362 sw.stop(); 363 if (LOG.isDebugEnabled()) { 364 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 365 + ", TimeTaken: " + sw.elapsedMillis()); 366 } 367 return splits.toArray(new FileSplit[splits.size()]); 368 } 369 370 protected long computeSplitSize(long goalSize, long minSize, 371 long blockSize) { 372 return Math.max(minSize, Math.min(goalSize, blockSize)); 373 } 374 375 protected int getBlockIndex(BlockLocation[] blkLocations, 376 long offset) { 377 for (int i = 0 ; i < blkLocations.length; i++) { 378 // is the offset inside this block? 379 if ((blkLocations[i].getOffset() <= offset) && 380 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 381 return i; 382 } 383 } 384 BlockLocation last = blkLocations[blkLocations.length -1]; 385 long fileLength = last.getOffset() + last.getLength() -1; 386 throw new IllegalArgumentException("Offset " + offset + 387 " is outside of file (0.." + 388 fileLength + ")"); 389 } 390 391 /** 392 * Sets the given comma separated paths as the list of inputs 393 * for the map-reduce job. 394 * 395 * @param conf Configuration of the job 396 * @param commaSeparatedPaths Comma separated paths to be set as 397 * the list of inputs for the map-reduce job. 398 */ 399 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 400 setInputPaths(conf, StringUtils.stringToPath( 401 getPathStrings(commaSeparatedPaths))); 402 } 403 404 /** 405 * Add the given comma separated paths to the list of inputs for 406 * the map-reduce job. 407 * 408 * @param conf The configuration of the job 409 * @param commaSeparatedPaths Comma separated paths to be added to 410 * the list of inputs for the map-reduce job. 411 */ 412 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 413 for (String str : getPathStrings(commaSeparatedPaths)) { 414 addInputPath(conf, new Path(str)); 415 } 416 } 417 418 /** 419 * Set the array of {@link Path}s as the list of inputs 420 * for the map-reduce job. 421 * 422 * @param conf Configuration of the job. 423 * @param inputPaths the {@link Path}s of the input directories/files 424 * for the map-reduce job. 425 */ 426 public static void setInputPaths(JobConf conf, Path... inputPaths) { 427 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 428 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 429 for(int i = 1; i < inputPaths.length;i++) { 430 str.append(StringUtils.COMMA_STR); 431 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 432 str.append(StringUtils.escapeString(path.toString())); 433 } 434 conf.set(org.apache.hadoop.mapreduce.lib.input. 435 FileInputFormat.INPUT_DIR, str.toString()); 436 } 437 438 /** 439 * Add a {@link Path} to the list of inputs for the map-reduce job. 440 * 441 * @param conf The configuration of the job 442 * @param path {@link Path} to be added to the list of inputs for 443 * the map-reduce job. 444 */ 445 public static void addInputPath(JobConf conf, Path path ) { 446 path = new Path(conf.getWorkingDirectory(), path); 447 String dirStr = StringUtils.escapeString(path.toString()); 448 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 449 FileInputFormat.INPUT_DIR); 450 conf.set(org.apache.hadoop.mapreduce.lib.input. 451 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 452 dirs + StringUtils.COMMA_STR + dirStr); 453 } 454 455 // This method escapes commas in the glob pattern of the given paths. 456 private static String[] getPathStrings(String commaSeparatedPaths) { 457 int length = commaSeparatedPaths.length(); 458 int curlyOpen = 0; 459 int pathStart = 0; 460 boolean globPattern = false; 461 List<String> pathStrings = new ArrayList<String>(); 462 463 for (int i=0; i<length; i++) { 464 char ch = commaSeparatedPaths.charAt(i); 465 switch(ch) { 466 case '{' : { 467 curlyOpen++; 468 if (!globPattern) { 469 globPattern = true; 470 } 471 break; 472 } 473 case '}' : { 474 curlyOpen--; 475 if (curlyOpen == 0 && globPattern) { 476 globPattern = false; 477 } 478 break; 479 } 480 case ',' : { 481 if (!globPattern) { 482 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 483 pathStart = i + 1 ; 484 } 485 break; 486 } 487 default: 488 continue; // nothing special to do for this character 489 } 490 } 491 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 492 493 return pathStrings.toArray(new String[0]); 494 } 495 496 /** 497 * Get the list of input {@link Path}s for the map-reduce job. 498 * 499 * @param conf The configuration of the job 500 * @return the list of input {@link Path}s for the map-reduce job. 501 */ 502 public static Path[] getInputPaths(JobConf conf) { 503 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 504 FileInputFormat.INPUT_DIR, ""); 505 String [] list = StringUtils.split(dirs); 506 Path[] result = new Path[list.length]; 507 for (int i = 0; i < list.length; i++) { 508 result[i] = new Path(StringUtils.unEscapeString(list[i])); 509 } 510 return result; 511 } 512 513 514 private void sortInDescendingOrder(List<NodeInfo> mylist) { 515 Collections.sort(mylist, new Comparator<NodeInfo> () { 516 public int compare(NodeInfo obj1, NodeInfo obj2) { 517 518 if (obj1 == null || obj2 == null) 519 return -1; 520 521 if (obj1.getValue() == obj2.getValue()) { 522 return 0; 523 } 524 else { 525 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 526 } 527 } 528 } 529 ); 530 } 531 532 /** 533 * This function identifies and returns the hosts that contribute 534 * most for a given split. For calculating the contribution, rack 535 * locality is treated on par with host locality, so hosts from racks 536 * that contribute the most are preferred over hosts on racks that 537 * contribute less 538 * @param blkLocations The list of block locations 539 * @param offset 540 * @param splitSize 541 * @return array of hosts that contribute most to this split 542 * @throws IOException 543 */ 544 protected String[] getSplitHosts(BlockLocation[] blkLocations, 545 long offset, long splitSize, NetworkTopology clusterMap) 546 throws IOException { 547 548 int startIndex = getBlockIndex(blkLocations, offset); 549 550 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 551 blkLocations[startIndex].getLength() - offset; 552 553 //If this is the only block, just return 554 if (bytesInThisBlock >= splitSize) { 555 return blkLocations[startIndex].getHosts(); 556 } 557 558 long bytesInFirstBlock = bytesInThisBlock; 559 int index = startIndex + 1; 560 splitSize -= bytesInThisBlock; 561 562 while (splitSize > 0) { 563 bytesInThisBlock = 564 Math.min(splitSize, blkLocations[index++].getLength()); 565 splitSize -= bytesInThisBlock; 566 } 567 568 long bytesInLastBlock = bytesInThisBlock; 569 int endIndex = index - 1; 570 571 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 572 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 573 String [] allTopos = new String[0]; 574 575 // Build the hierarchy and aggregate the contribution of 576 // bytes at each level. See TestGetSplitHosts.java 577 578 for (index = startIndex; index <= endIndex; index++) { 579 580 // Establish the bytes in this block 581 if (index == startIndex) { 582 bytesInThisBlock = bytesInFirstBlock; 583 } 584 else if (index == endIndex) { 585 bytesInThisBlock = bytesInLastBlock; 586 } 587 else { 588 bytesInThisBlock = blkLocations[index].getLength(); 589 } 590 591 allTopos = blkLocations[index].getTopologyPaths(); 592 593 // If no topology information is available, just 594 // prefix a fakeRack 595 if (allTopos.length == 0) { 596 allTopos = fakeRacks(blkLocations, index); 597 } 598 599 // NOTE: This code currently works only for one level of 600 // hierarchy (rack/host). However, it is relatively easy 601 // to extend this to support aggregation at different 602 // levels 603 604 for (String topo: allTopos) { 605 606 Node node, parentNode; 607 NodeInfo nodeInfo, parentNodeInfo; 608 609 node = clusterMap.getNode(topo); 610 611 if (node == null) { 612 node = new NodeBase(topo); 613 clusterMap.add(node); 614 } 615 616 nodeInfo = hostsMap.get(node); 617 618 if (nodeInfo == null) { 619 nodeInfo = new NodeInfo(node); 620 hostsMap.put(node,nodeInfo); 621 parentNode = node.getParent(); 622 parentNodeInfo = racksMap.get(parentNode); 623 if (parentNodeInfo == null) { 624 parentNodeInfo = new NodeInfo(parentNode); 625 racksMap.put(parentNode,parentNodeInfo); 626 } 627 parentNodeInfo.addLeaf(nodeInfo); 628 } 629 else { 630 nodeInfo = hostsMap.get(node); 631 parentNode = node.getParent(); 632 parentNodeInfo = racksMap.get(parentNode); 633 } 634 635 nodeInfo.addValue(index, bytesInThisBlock); 636 parentNodeInfo.addValue(index, bytesInThisBlock); 637 638 } // for all topos 639 640 } // for all indices 641 642 return identifyHosts(allTopos.length, racksMap); 643 } 644 645 private String[] identifyHosts(int replicationFactor, 646 Map<Node,NodeInfo> racksMap) { 647 648 String [] retVal = new String[replicationFactor]; 649 650 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 651 652 rackList.addAll(racksMap.values()); 653 654 // Sort the racks based on their contribution to this split 655 sortInDescendingOrder(rackList); 656 657 boolean done = false; 658 int index = 0; 659 660 // Get the host list for all our aggregated items, sort 661 // them and return the top entries 662 for (NodeInfo ni: rackList) { 663 664 Set<NodeInfo> hostSet = ni.getLeaves(); 665 666 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 667 hostList.addAll(hostSet); 668 669 // Sort the hosts in this rack based on their contribution 670 sortInDescendingOrder(hostList); 671 672 for (NodeInfo host: hostList) { 673 // Strip out the port number from the host name 674 retVal[index++] = host.node.getName().split(":")[0]; 675 if (index == replicationFactor) { 676 done = true; 677 break; 678 } 679 } 680 681 if (done == true) { 682 break; 683 } 684 } 685 return retVal; 686 } 687 688 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 689 throws IOException { 690 String[] allHosts = blkLocations[index].getHosts(); 691 String[] allTopos = new String[allHosts.length]; 692 for (int i = 0; i < allHosts.length; i++) { 693 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 694 } 695 return allTopos; 696 } 697 698 699 private static class NodeInfo { 700 final Node node; 701 final Set<Integer> blockIds; 702 final Set<NodeInfo> leaves; 703 704 private long value; 705 706 NodeInfo(Node node) { 707 this.node = node; 708 blockIds = new HashSet<Integer>(); 709 leaves = new HashSet<NodeInfo>(); 710 } 711 712 long getValue() {return value;} 713 714 void addValue(int blockIndex, long value) { 715 if (blockIds.add(blockIndex) == true) { 716 this.value += value; 717 } 718 } 719 720 Set<NodeInfo> getLeaves() { return leaves;} 721 722 void addLeaf(NodeInfo nodeInfo) { 723 leaves.add(nodeInfo); 724 } 725 } 726 }