001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.fs.BlockLocation; 038import org.apache.hadoop.fs.FileStatus; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.LocatedFileStatus; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.PathFilter; 043import org.apache.hadoop.fs.RemoteIterator; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.net.NetworkTopology; 046import org.apache.hadoop.net.Node; 047import org.apache.hadoop.net.NodeBase; 048import org.apache.hadoop.util.ReflectionUtils; 049import org.apache.hadoop.util.StopWatch; 050import org.apache.hadoop.util.StringUtils; 051 052import com.google.common.collect.Iterables; 053 054/** 055 * A base class for file-based {@link InputFormat}. 056 * 057 * <p><code>FileInputFormat</code> is the base class for all file-based 058 * <code>InputFormat</code>s. This provides a generic implementation of 059 * {@link #getSplits(JobConf, int)}. 060 * Subclasses of <code>FileInputFormat</code> can also override the 061 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 062 * not split-up and are processed as a whole by {@link Mapper}s. 063 */ 064@InterfaceAudience.Public 065@InterfaceStability.Stable 066public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 067 068 public static final Log LOG = 069 LogFactory.getLog(FileInputFormat.class); 070 071 @Deprecated 072 public static enum Counter { 073 BYTES_READ 074 } 075 076 public static final String NUM_INPUT_FILES = 077 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 078 079 public static final String INPUT_DIR_RECURSIVE = 080 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 081 082 083 private static final double SPLIT_SLOP = 1.1; // 10% slop 084 085 private long minSplitSize = 1; 086 private static final PathFilter hiddenFileFilter = new PathFilter(){ 087 public boolean accept(Path p){ 088 String name = p.getName(); 089 return !name.startsWith("_") && !name.startsWith("."); 090 } 091 }; 092 protected void setMinSplitSize(long minSplitSize) { 093 this.minSplitSize = minSplitSize; 094 } 095 096 /** 097 * Proxy PathFilter that accepts a path only if all filters given in the 098 * constructor do. Used by the listPaths() to apply the built-in 099 * hiddenFileFilter together with a user provided one (if any). 100 */ 101 private static class MultiPathFilter implements PathFilter { 102 private List<PathFilter> filters; 103 104 public MultiPathFilter(List<PathFilter> filters) { 105 this.filters = filters; 106 } 107 108 public boolean accept(Path path) { 109 for (PathFilter filter : filters) { 110 if (!filter.accept(path)) { 111 return false; 112 } 113 } 114 return true; 115 } 116 } 117 118 /** 119 * Is the given filename splitable? Usually, true, but if the file is 120 * stream compressed, it will not be. 121 * 122 * <code>FileInputFormat</code> implementations can override this and return 123 * <code>false</code> to ensure that individual input files are never split-up 124 * so that {@link Mapper}s process entire files. 125 * 126 * @param fs the file system that the file is on 127 * @param filename the file name to check 128 * @return is this file splitable? 129 */ 130 protected boolean isSplitable(FileSystem fs, Path filename) { 131 return true; 132 } 133 134 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 135 JobConf job, 136 Reporter reporter) 137 throws IOException; 138 139 /** 140 * Set a PathFilter to be applied to the input paths for the map-reduce job. 141 * 142 * @param filter the PathFilter class use for filtering the input paths. 143 */ 144 public static void setInputPathFilter(JobConf conf, 145 Class<? extends PathFilter> filter) { 146 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 147 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 148 } 149 150 /** 151 * Get a PathFilter instance of the filter set for the input paths. 152 * 153 * @return the PathFilter instance set for the job, NULL if none has been set. 154 */ 155 public static PathFilter getInputPathFilter(JobConf conf) { 156 Class<? extends PathFilter> filterClass = conf.getClass( 157 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 158 null, PathFilter.class); 159 return (filterClass != null) ? 160 ReflectionUtils.newInstance(filterClass, conf) : null; 161 } 162 163 /** 164 * Add files in the input path recursively into the results. 165 * @param result 166 * The List to store all files. 167 * @param fs 168 * The FileSystem. 169 * @param path 170 * The input path. 171 * @param inputFilter 172 * The input filter that can be used to filter files/dirs. 173 * @throws IOException 174 */ 175 protected void addInputPathRecursively(List<FileStatus> result, 176 FileSystem fs, Path path, PathFilter inputFilter) 177 throws IOException { 178 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 179 while (iter.hasNext()) { 180 LocatedFileStatus stat = iter.next(); 181 if (inputFilter.accept(stat.getPath())) { 182 if (stat.isDirectory()) { 183 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 184 } else { 185 result.add(stat); 186 } 187 } 188 } 189 } 190 191 /** List input directories. 192 * Subclasses may override to, e.g., select only files matching a regular 193 * expression. 194 * 195 * @param job the job to list input paths for 196 * @return array of FileStatus objects 197 * @throws IOException if zero items. 198 */ 199 protected FileStatus[] listStatus(JobConf job) throws IOException { 200 Path[] dirs = getInputPaths(job); 201 if (dirs.length == 0) { 202 throw new IOException("No input paths specified in job"); 203 } 204 205 // get tokens for all the required FileSystems.. 206 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 207 208 // Whether we need to recursive look into the directory structure 209 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 210 211 // creates a MultiPathFilter with the hiddenFileFilter and the 212 // user provided one (if any). 213 List<PathFilter> filters = new ArrayList<PathFilter>(); 214 filters.add(hiddenFileFilter); 215 PathFilter jobFilter = getInputPathFilter(job); 216 if (jobFilter != null) { 217 filters.add(jobFilter); 218 } 219 PathFilter inputFilter = new MultiPathFilter(filters); 220 221 FileStatus[] result; 222 int numThreads = job 223 .getInt( 224 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 225 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 226 227 StopWatch sw = new StopWatch().start(); 228 if (numThreads == 1) { 229 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 230 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 231 } else { 232 Iterable<FileStatus> locatedFiles = null; 233 try { 234 235 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 236 job, dirs, recursive, inputFilter, false); 237 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 238 } catch (InterruptedException e) { 239 throw new IOException("Interrupted while getting file statuses"); 240 } 241 result = Iterables.toArray(locatedFiles, FileStatus.class); 242 } 243 244 sw.stop(); 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("Time taken to get FileStatuses: " 247 + sw.now(TimeUnit.MILLISECONDS)); 248 } 249 LOG.info("Total input paths to process : " + result.length); 250 return result; 251 } 252 253 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 254 PathFilter inputFilter, boolean recursive) throws IOException { 255 List<FileStatus> result = new ArrayList<FileStatus>(); 256 List<IOException> errors = new ArrayList<IOException>(); 257 for (Path p: dirs) { 258 FileSystem fs = p.getFileSystem(job); 259 FileStatus[] matches = fs.globStatus(p, inputFilter); 260 if (matches == null) { 261 errors.add(new IOException("Input path does not exist: " + p)); 262 } else if (matches.length == 0) { 263 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 264 } else { 265 for (FileStatus globStat: matches) { 266 if (globStat.isDirectory()) { 267 RemoteIterator<LocatedFileStatus> iter = 268 fs.listLocatedStatus(globStat.getPath()); 269 while (iter.hasNext()) { 270 LocatedFileStatus stat = iter.next(); 271 if (inputFilter.accept(stat.getPath())) { 272 if (recursive && stat.isDirectory()) { 273 addInputPathRecursively(result, fs, stat.getPath(), 274 inputFilter); 275 } else { 276 result.add(stat); 277 } 278 } 279 } 280 } else { 281 result.add(globStat); 282 } 283 } 284 } 285 } 286 if (!errors.isEmpty()) { 287 throw new InvalidInputException(errors); 288 } 289 return result; 290 } 291 292 /** 293 * A factory that makes the split for this class. It can be overridden 294 * by sub-classes to make sub-types 295 */ 296 protected FileSplit makeSplit(Path file, long start, long length, 297 String[] hosts) { 298 return new FileSplit(file, start, length, hosts); 299 } 300 301 /** 302 * A factory that makes the split for this class. It can be overridden 303 * by sub-classes to make sub-types 304 */ 305 protected FileSplit makeSplit(Path file, long start, long length, 306 String[] hosts, String[] inMemoryHosts) { 307 return new FileSplit(file, start, length, hosts, inMemoryHosts); 308 } 309 310 /** Splits files returned by {@link #listStatus(JobConf)} when 311 * they're too big.*/ 312 public InputSplit[] getSplits(JobConf job, int numSplits) 313 throws IOException { 314 StopWatch sw = new StopWatch().start(); 315 FileStatus[] files = listStatus(job); 316 317 // Save the number of input files for metrics/loadgen 318 job.setLong(NUM_INPUT_FILES, files.length); 319 long totalSize = 0; // compute total size 320 for (FileStatus file: files) { // check we have valid files 321 if (file.isDirectory()) { 322 throw new IOException("Not a file: "+ file.getPath()); 323 } 324 totalSize += file.getLen(); 325 } 326 327 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 328 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 329 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 330 331 // generate splits 332 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 333 NetworkTopology clusterMap = new NetworkTopology(); 334 for (FileStatus file: files) { 335 Path path = file.getPath(); 336 long length = file.getLen(); 337 if (length != 0) { 338 FileSystem fs = path.getFileSystem(job); 339 BlockLocation[] blkLocations; 340 if (file instanceof LocatedFileStatus) { 341 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 342 } else { 343 blkLocations = fs.getFileBlockLocations(file, 0, length); 344 } 345 if (isSplitable(fs, path)) { 346 long blockSize = file.getBlockSize(); 347 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 348 349 long bytesRemaining = length; 350 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 351 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 352 length-bytesRemaining, splitSize, clusterMap); 353 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 354 splitHosts[0], splitHosts[1])); 355 bytesRemaining -= splitSize; 356 } 357 358 if (bytesRemaining != 0) { 359 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 360 - bytesRemaining, bytesRemaining, clusterMap); 361 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 362 splitHosts[0], splitHosts[1])); 363 } 364 } else { 365 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 366 splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 367 } 368 } else { 369 //Create empty hosts array for zero length files 370 splits.add(makeSplit(path, 0, length, new String[0])); 371 } 372 } 373 sw.stop(); 374 if (LOG.isDebugEnabled()) { 375 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 376 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 377 } 378 return splits.toArray(new FileSplit[splits.size()]); 379 } 380 381 protected long computeSplitSize(long goalSize, long minSize, 382 long blockSize) { 383 return Math.max(minSize, Math.min(goalSize, blockSize)); 384 } 385 386 protected int getBlockIndex(BlockLocation[] blkLocations, 387 long offset) { 388 for (int i = 0 ; i < blkLocations.length; i++) { 389 // is the offset inside this block? 390 if ((blkLocations[i].getOffset() <= offset) && 391 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 392 return i; 393 } 394 } 395 BlockLocation last = blkLocations[blkLocations.length -1]; 396 long fileLength = last.getOffset() + last.getLength() -1; 397 throw new IllegalArgumentException("Offset " + offset + 398 " is outside of file (0.." + 399 fileLength + ")"); 400 } 401 402 /** 403 * Sets the given comma separated paths as the list of inputs 404 * for the map-reduce job. 405 * 406 * @param conf Configuration of the job 407 * @param commaSeparatedPaths Comma separated paths to be set as 408 * the list of inputs for the map-reduce job. 409 */ 410 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 411 setInputPaths(conf, StringUtils.stringToPath( 412 getPathStrings(commaSeparatedPaths))); 413 } 414 415 /** 416 * Add the given comma separated paths to the list of inputs for 417 * the map-reduce job. 418 * 419 * @param conf The configuration of the job 420 * @param commaSeparatedPaths Comma separated paths to be added to 421 * the list of inputs for the map-reduce job. 422 */ 423 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 424 for (String str : getPathStrings(commaSeparatedPaths)) { 425 addInputPath(conf, new Path(str)); 426 } 427 } 428 429 /** 430 * Set the array of {@link Path}s as the list of inputs 431 * for the map-reduce job. 432 * 433 * @param conf Configuration of the job. 434 * @param inputPaths the {@link Path}s of the input directories/files 435 * for the map-reduce job. 436 */ 437 public static void setInputPaths(JobConf conf, Path... inputPaths) { 438 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 439 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 440 for(int i = 1; i < inputPaths.length;i++) { 441 str.append(StringUtils.COMMA_STR); 442 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 443 str.append(StringUtils.escapeString(path.toString())); 444 } 445 conf.set(org.apache.hadoop.mapreduce.lib.input. 446 FileInputFormat.INPUT_DIR, str.toString()); 447 } 448 449 /** 450 * Add a {@link Path} to the list of inputs for the map-reduce job. 451 * 452 * @param conf The configuration of the job 453 * @param path {@link Path} to be added to the list of inputs for 454 * the map-reduce job. 455 */ 456 public static void addInputPath(JobConf conf, Path path ) { 457 path = new Path(conf.getWorkingDirectory(), path); 458 String dirStr = StringUtils.escapeString(path.toString()); 459 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 460 FileInputFormat.INPUT_DIR); 461 conf.set(org.apache.hadoop.mapreduce.lib.input. 462 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 463 dirs + StringUtils.COMMA_STR + dirStr); 464 } 465 466 // This method escapes commas in the glob pattern of the given paths. 467 private static String[] getPathStrings(String commaSeparatedPaths) { 468 int length = commaSeparatedPaths.length(); 469 int curlyOpen = 0; 470 int pathStart = 0; 471 boolean globPattern = false; 472 List<String> pathStrings = new ArrayList<String>(); 473 474 for (int i=0; i<length; i++) { 475 char ch = commaSeparatedPaths.charAt(i); 476 switch(ch) { 477 case '{' : { 478 curlyOpen++; 479 if (!globPattern) { 480 globPattern = true; 481 } 482 break; 483 } 484 case '}' : { 485 curlyOpen--; 486 if (curlyOpen == 0 && globPattern) { 487 globPattern = false; 488 } 489 break; 490 } 491 case ',' : { 492 if (!globPattern) { 493 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 494 pathStart = i + 1 ; 495 } 496 break; 497 } 498 default: 499 continue; // nothing special to do for this character 500 } 501 } 502 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 503 504 return pathStrings.toArray(new String[0]); 505 } 506 507 /** 508 * Get the list of input {@link Path}s for the map-reduce job. 509 * 510 * @param conf The configuration of the job 511 * @return the list of input {@link Path}s for the map-reduce job. 512 */ 513 public static Path[] getInputPaths(JobConf conf) { 514 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 515 FileInputFormat.INPUT_DIR, ""); 516 String [] list = StringUtils.split(dirs); 517 Path[] result = new Path[list.length]; 518 for (int i = 0; i < list.length; i++) { 519 result[i] = new Path(StringUtils.unEscapeString(list[i])); 520 } 521 return result; 522 } 523 524 525 private void sortInDescendingOrder(List<NodeInfo> mylist) { 526 Collections.sort(mylist, new Comparator<NodeInfo> () { 527 public int compare(NodeInfo obj1, NodeInfo obj2) { 528 529 if (obj1 == null || obj2 == null) 530 return -1; 531 532 if (obj1.getValue() == obj2.getValue()) { 533 return 0; 534 } 535 else { 536 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 537 } 538 } 539 } 540 ); 541 } 542 543 /** 544 * This function identifies and returns the hosts that contribute 545 * most for a given split. For calculating the contribution, rack 546 * locality is treated on par with host locality, so hosts from racks 547 * that contribute the most are preferred over hosts on racks that 548 * contribute less 549 * @param blkLocations The list of block locations 550 * @param offset 551 * @param splitSize 552 * @return an array of hosts that contribute most to this split 553 * @throws IOException 554 */ 555 protected String[] getSplitHosts(BlockLocation[] blkLocations, 556 long offset, long splitSize, NetworkTopology clusterMap) throws IOException { 557 return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, 558 clusterMap)[0]; 559 } 560 561 /** 562 * This function identifies and returns the hosts that contribute 563 * most for a given split. For calculating the contribution, rack 564 * locality is treated on par with host locality, so hosts from racks 565 * that contribute the most are preferred over hosts on racks that 566 * contribute less 567 * @param blkLocations The list of block locations 568 * @param offset 569 * @param splitSize 570 * @return two arrays - one of hosts that contribute most to this split, and 571 * one of hosts that contribute most to this split that have the data 572 * cached on them 573 * @throws IOException 574 */ 575 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 576 long offset, long splitSize, NetworkTopology clusterMap) 577 throws IOException { 578 579 int startIndex = getBlockIndex(blkLocations, offset); 580 581 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 582 blkLocations[startIndex].getLength() - offset; 583 584 //If this is the only block, just return 585 if (bytesInThisBlock >= splitSize) { 586 return new String[][] { blkLocations[startIndex].getHosts(), 587 blkLocations[startIndex].getCachedHosts() }; 588 } 589 590 long bytesInFirstBlock = bytesInThisBlock; 591 int index = startIndex + 1; 592 splitSize -= bytesInThisBlock; 593 594 while (splitSize > 0) { 595 bytesInThisBlock = 596 Math.min(splitSize, blkLocations[index++].getLength()); 597 splitSize -= bytesInThisBlock; 598 } 599 600 long bytesInLastBlock = bytesInThisBlock; 601 int endIndex = index - 1; 602 603 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 604 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 605 String [] allTopos = new String[0]; 606 607 // Build the hierarchy and aggregate the contribution of 608 // bytes at each level. See TestGetSplitHosts.java 609 610 for (index = startIndex; index <= endIndex; index++) { 611 612 // Establish the bytes in this block 613 if (index == startIndex) { 614 bytesInThisBlock = bytesInFirstBlock; 615 } 616 else if (index == endIndex) { 617 bytesInThisBlock = bytesInLastBlock; 618 } 619 else { 620 bytesInThisBlock = blkLocations[index].getLength(); 621 } 622 623 allTopos = blkLocations[index].getTopologyPaths(); 624 625 // If no topology information is available, just 626 // prefix a fakeRack 627 if (allTopos.length == 0) { 628 allTopos = fakeRacks(blkLocations, index); 629 } 630 631 // NOTE: This code currently works only for one level of 632 // hierarchy (rack/host). However, it is relatively easy 633 // to extend this to support aggregation at different 634 // levels 635 636 for (String topo: allTopos) { 637 638 Node node, parentNode; 639 NodeInfo nodeInfo, parentNodeInfo; 640 641 node = clusterMap.getNode(topo); 642 643 if (node == null) { 644 node = new NodeBase(topo); 645 clusterMap.add(node); 646 } 647 648 nodeInfo = hostsMap.get(node); 649 650 if (nodeInfo == null) { 651 nodeInfo = new NodeInfo(node); 652 hostsMap.put(node,nodeInfo); 653 parentNode = node.getParent(); 654 parentNodeInfo = racksMap.get(parentNode); 655 if (parentNodeInfo == null) { 656 parentNodeInfo = new NodeInfo(parentNode); 657 racksMap.put(parentNode,parentNodeInfo); 658 } 659 parentNodeInfo.addLeaf(nodeInfo); 660 } 661 else { 662 nodeInfo = hostsMap.get(node); 663 parentNode = node.getParent(); 664 parentNodeInfo = racksMap.get(parentNode); 665 } 666 667 nodeInfo.addValue(index, bytesInThisBlock); 668 parentNodeInfo.addValue(index, bytesInThisBlock); 669 670 } // for all topos 671 672 } // for all indices 673 674 // We don't yet support cached hosts when bytesInThisBlock > splitSize 675 return new String[][] { identifyHosts(allTopos.length, racksMap), 676 new String[0]}; 677 } 678 679 private String[] identifyHosts(int replicationFactor, 680 Map<Node,NodeInfo> racksMap) { 681 682 String [] retVal = new String[replicationFactor]; 683 684 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 685 686 rackList.addAll(racksMap.values()); 687 688 // Sort the racks based on their contribution to this split 689 sortInDescendingOrder(rackList); 690 691 boolean done = false; 692 int index = 0; 693 694 // Get the host list for all our aggregated items, sort 695 // them and return the top entries 696 for (NodeInfo ni: rackList) { 697 698 Set<NodeInfo> hostSet = ni.getLeaves(); 699 700 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 701 hostList.addAll(hostSet); 702 703 // Sort the hosts in this rack based on their contribution 704 sortInDescendingOrder(hostList); 705 706 for (NodeInfo host: hostList) { 707 // Strip out the port number from the host name 708 retVal[index++] = host.node.getName().split(":")[0]; 709 if (index == replicationFactor) { 710 done = true; 711 break; 712 } 713 } 714 715 if (done == true) { 716 break; 717 } 718 } 719 return retVal; 720 } 721 722 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 723 throws IOException { 724 String[] allHosts = blkLocations[index].getHosts(); 725 String[] allTopos = new String[allHosts.length]; 726 for (int i = 0; i < allHosts.length; i++) { 727 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 728 } 729 return allTopos; 730 } 731 732 733 private static class NodeInfo { 734 final Node node; 735 final Set<Integer> blockIds; 736 final Set<NodeInfo> leaves; 737 738 private long value; 739 740 NodeInfo(Node node) { 741 this.node = node; 742 blockIds = new HashSet<Integer>(); 743 leaves = new HashSet<NodeInfo>(); 744 } 745 746 long getValue() {return value;} 747 748 void addValue(int blockIndex, long value) { 749 if (blockIds.add(blockIndex) == true) { 750 this.value += value; 751 } 752 } 753 754 Set<NodeInfo> getLeaves() { return leaves;} 755 756 void addLeaf(NodeInfo nodeInfo) { 757 leaves.add(nodeInfo); 758 } 759 } 760}