001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.hadoop.classification.InterfaceAudience; 035import org.apache.hadoop.classification.InterfaceStability; 036import org.apache.hadoop.fs.BlockLocation; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.LocatedFileStatus; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.PathFilter; 042import org.apache.hadoop.fs.RemoteIterator; 043import org.apache.hadoop.mapreduce.security.TokenCache; 044import org.apache.hadoop.net.NetworkTopology; 045import org.apache.hadoop.net.Node; 046import org.apache.hadoop.net.NodeBase; 047import org.apache.hadoop.util.ReflectionUtils; 048import org.apache.hadoop.util.StringUtils; 049 050import com.google.common.base.Stopwatch; 051import com.google.common.collect.Iterables; 052 053/** 054 * A base class for file-based {@link InputFormat}. 055 * 056 * <p><code>FileInputFormat</code> is the base class for all file-based 057 * <code>InputFormat</code>s. This provides a generic implementation of 058 * {@link #getSplits(JobConf, int)}. 059 * Subclasses of <code>FileInputFormat</code> can also override the 060 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 061 * not split-up and are processed as a whole by {@link Mapper}s. 062 */ 063@InterfaceAudience.Public 064@InterfaceStability.Stable 065public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 066 067 public static final Log LOG = 068 LogFactory.getLog(FileInputFormat.class); 069 070 @Deprecated 071 public static enum Counter { 072 BYTES_READ 073 } 074 075 public static final String NUM_INPUT_FILES = 076 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 077 078 public static final String INPUT_DIR_RECURSIVE = 079 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 080 081 082 private static final double SPLIT_SLOP = 1.1; // 10% slop 083 084 private long minSplitSize = 1; 085 private static final PathFilter hiddenFileFilter = new PathFilter(){ 086 public boolean accept(Path p){ 087 String name = p.getName(); 088 return !name.startsWith("_") && !name.startsWith("."); 089 } 090 }; 091 protected void setMinSplitSize(long minSplitSize) { 092 this.minSplitSize = minSplitSize; 093 } 094 095 /** 096 * Proxy PathFilter that accepts a path only if all filters given in the 097 * constructor do. Used by the listPaths() to apply the built-in 098 * hiddenFileFilter together with a user provided one (if any). 099 */ 100 private static class MultiPathFilter implements PathFilter { 101 private List<PathFilter> filters; 102 103 public MultiPathFilter(List<PathFilter> filters) { 104 this.filters = filters; 105 } 106 107 public boolean accept(Path path) { 108 for (PathFilter filter : filters) { 109 if (!filter.accept(path)) { 110 return false; 111 } 112 } 113 return true; 114 } 115 } 116 117 /** 118 * Is the given filename splitable? Usually, true, but if the file is 119 * stream compressed, it will not be. 120 * 121 * <code>FileInputFormat</code> implementations can override this and return 122 * <code>false</code> to ensure that individual input files are never split-up 123 * so that {@link Mapper}s process entire files. 124 * 125 * @param fs the file system that the file is on 126 * @param filename the file name to check 127 * @return is this file splitable? 128 */ 129 protected boolean isSplitable(FileSystem fs, Path filename) { 130 return true; 131 } 132 133 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 134 JobConf job, 135 Reporter reporter) 136 throws IOException; 137 138 /** 139 * Set a PathFilter to be applied to the input paths for the map-reduce job. 140 * 141 * @param filter the PathFilter class use for filtering the input paths. 142 */ 143 public static void setInputPathFilter(JobConf conf, 144 Class<? extends PathFilter> filter) { 145 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 146 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 147 } 148 149 /** 150 * Get a PathFilter instance of the filter set for the input paths. 151 * 152 * @return the PathFilter instance set for the job, NULL if none has been set. 153 */ 154 public static PathFilter getInputPathFilter(JobConf conf) { 155 Class<? extends PathFilter> filterClass = conf.getClass( 156 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 157 null, PathFilter.class); 158 return (filterClass != null) ? 159 ReflectionUtils.newInstance(filterClass, conf) : null; 160 } 161 162 /** 163 * Add files in the input path recursively into the results. 164 * @param result 165 * The List to store all files. 166 * @param fs 167 * The FileSystem. 168 * @param path 169 * The input path. 170 * @param inputFilter 171 * The input filter that can be used to filter files/dirs. 172 * @throws IOException 173 */ 174 protected void addInputPathRecursively(List<FileStatus> result, 175 FileSystem fs, Path path, PathFilter inputFilter) 176 throws IOException { 177 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 178 while (iter.hasNext()) { 179 LocatedFileStatus stat = iter.next(); 180 if (inputFilter.accept(stat.getPath())) { 181 if (stat.isDirectory()) { 182 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 183 } else { 184 result.add(stat); 185 } 186 } 187 } 188 } 189 190 /** List input directories. 191 * Subclasses may override to, e.g., select only files matching a regular 192 * expression. 193 * 194 * @param job the job to list input paths for 195 * @return array of FileStatus objects 196 * @throws IOException if zero items. 197 */ 198 protected FileStatus[] listStatus(JobConf job) throws IOException { 199 Path[] dirs = getInputPaths(job); 200 if (dirs.length == 0) { 201 throw new IOException("No input paths specified in job"); 202 } 203 204 // get tokens for all the required FileSystems.. 205 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 206 207 // Whether we need to recursive look into the directory structure 208 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 209 210 // creates a MultiPathFilter with the hiddenFileFilter and the 211 // user provided one (if any). 212 List<PathFilter> filters = new ArrayList<PathFilter>(); 213 filters.add(hiddenFileFilter); 214 PathFilter jobFilter = getInputPathFilter(job); 215 if (jobFilter != null) { 216 filters.add(jobFilter); 217 } 218 PathFilter inputFilter = new MultiPathFilter(filters); 219 220 FileStatus[] result; 221 int numThreads = job 222 .getInt( 223 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS, 224 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); 225 226 Stopwatch sw = new Stopwatch().start(); 227 if (numThreads == 1) { 228 List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 229 result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]); 230 } else { 231 Iterable<FileStatus> locatedFiles = null; 232 try { 233 234 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 235 job, dirs, recursive, inputFilter, false); 236 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 237 } catch (InterruptedException e) { 238 throw new IOException("Interrupted while getting file statuses"); 239 } 240 result = Iterables.toArray(locatedFiles, FileStatus.class); 241 } 242 243 sw.stop(); 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); 246 } 247 LOG.info("Total input paths to process : " + result.length); 248 return result; 249 } 250 251 private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs, 252 PathFilter inputFilter, boolean recursive) throws IOException { 253 List<FileStatus> result = new ArrayList<FileStatus>(); 254 List<IOException> errors = new ArrayList<IOException>(); 255 for (Path p: dirs) { 256 FileSystem fs = p.getFileSystem(job); 257 FileStatus[] matches = fs.globStatus(p, inputFilter); 258 if (matches == null) { 259 errors.add(new IOException("Input path does not exist: " + p)); 260 } else if (matches.length == 0) { 261 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 262 } else { 263 for (FileStatus globStat: matches) { 264 if (globStat.isDirectory()) { 265 RemoteIterator<LocatedFileStatus> iter = 266 fs.listLocatedStatus(globStat.getPath()); 267 while (iter.hasNext()) { 268 LocatedFileStatus stat = iter.next(); 269 if (inputFilter.accept(stat.getPath())) { 270 if (recursive && stat.isDirectory()) { 271 addInputPathRecursively(result, fs, stat.getPath(), 272 inputFilter); 273 } else { 274 result.add(stat); 275 } 276 } 277 } 278 } else { 279 result.add(globStat); 280 } 281 } 282 } 283 } 284 if (!errors.isEmpty()) { 285 throw new InvalidInputException(errors); 286 } 287 return result; 288 } 289 290 /** 291 * A factory that makes the split for this class. It can be overridden 292 * by sub-classes to make sub-types 293 */ 294 protected FileSplit makeSplit(Path file, long start, long length, 295 String[] hosts) { 296 return new FileSplit(file, start, length, hosts); 297 } 298 299 /** 300 * A factory that makes the split for this class. It can be overridden 301 * by sub-classes to make sub-types 302 */ 303 protected FileSplit makeSplit(Path file, long start, long length, 304 String[] hosts, String[] inMemoryHosts) { 305 return new FileSplit(file, start, length, hosts, inMemoryHosts); 306 } 307 308 /** Splits files returned by {@link #listStatus(JobConf)} when 309 * they're too big.*/ 310 public InputSplit[] getSplits(JobConf job, int numSplits) 311 throws IOException { 312 Stopwatch sw = new Stopwatch().start(); 313 FileStatus[] files = listStatus(job); 314 315 // Save the number of input files for metrics/loadgen 316 job.setLong(NUM_INPUT_FILES, files.length); 317 long totalSize = 0; // compute total size 318 for (FileStatus file: files) { // check we have valid files 319 if (file.isDirectory()) { 320 throw new IOException("Not a file: "+ file.getPath()); 321 } 322 totalSize += file.getLen(); 323 } 324 325 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 326 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 327 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 328 329 // generate splits 330 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 331 NetworkTopology clusterMap = new NetworkTopology(); 332 for (FileStatus file: files) { 333 Path path = file.getPath(); 334 long length = file.getLen(); 335 if (length != 0) { 336 FileSystem fs = path.getFileSystem(job); 337 BlockLocation[] blkLocations; 338 if (file instanceof LocatedFileStatus) { 339 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 340 } else { 341 blkLocations = fs.getFileBlockLocations(file, 0, length); 342 } 343 if (isSplitable(fs, path)) { 344 long blockSize = file.getBlockSize(); 345 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 346 347 long bytesRemaining = length; 348 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 349 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, 350 length-bytesRemaining, splitSize, clusterMap); 351 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 352 splitHosts[0], splitHosts[1])); 353 bytesRemaining -= splitSize; 354 } 355 356 if (bytesRemaining != 0) { 357 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length 358 - bytesRemaining, bytesRemaining, clusterMap); 359 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 360 splitHosts[0], splitHosts[1])); 361 } 362 } else { 363 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); 364 splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); 365 } 366 } else { 367 //Create empty hosts array for zero length files 368 splits.add(makeSplit(path, 0, length, new String[0])); 369 } 370 } 371 sw.stop(); 372 if (LOG.isDebugEnabled()) { 373 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 374 + ", TimeTaken: " + sw.elapsedMillis()); 375 } 376 return splits.toArray(new FileSplit[splits.size()]); 377 } 378 379 protected long computeSplitSize(long goalSize, long minSize, 380 long blockSize) { 381 return Math.max(minSize, Math.min(goalSize, blockSize)); 382 } 383 384 protected int getBlockIndex(BlockLocation[] blkLocations, 385 long offset) { 386 for (int i = 0 ; i < blkLocations.length; i++) { 387 // is the offset inside this block? 388 if ((blkLocations[i].getOffset() <= offset) && 389 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 390 return i; 391 } 392 } 393 BlockLocation last = blkLocations[blkLocations.length -1]; 394 long fileLength = last.getOffset() + last.getLength() -1; 395 throw new IllegalArgumentException("Offset " + offset + 396 " is outside of file (0.." + 397 fileLength + ")"); 398 } 399 400 /** 401 * Sets the given comma separated paths as the list of inputs 402 * for the map-reduce job. 403 * 404 * @param conf Configuration of the job 405 * @param commaSeparatedPaths Comma separated paths to be set as 406 * the list of inputs for the map-reduce job. 407 */ 408 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 409 setInputPaths(conf, StringUtils.stringToPath( 410 getPathStrings(commaSeparatedPaths))); 411 } 412 413 /** 414 * Add the given comma separated paths to the list of inputs for 415 * the map-reduce job. 416 * 417 * @param conf The configuration of the job 418 * @param commaSeparatedPaths Comma separated paths to be added to 419 * the list of inputs for the map-reduce job. 420 */ 421 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 422 for (String str : getPathStrings(commaSeparatedPaths)) { 423 addInputPath(conf, new Path(str)); 424 } 425 } 426 427 /** 428 * Set the array of {@link Path}s as the list of inputs 429 * for the map-reduce job. 430 * 431 * @param conf Configuration of the job. 432 * @param inputPaths the {@link Path}s of the input directories/files 433 * for the map-reduce job. 434 */ 435 public static void setInputPaths(JobConf conf, Path... inputPaths) { 436 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 437 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 438 for(int i = 1; i < inputPaths.length;i++) { 439 str.append(StringUtils.COMMA_STR); 440 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 441 str.append(StringUtils.escapeString(path.toString())); 442 } 443 conf.set(org.apache.hadoop.mapreduce.lib.input. 444 FileInputFormat.INPUT_DIR, str.toString()); 445 } 446 447 /** 448 * Add a {@link Path} to the list of inputs for the map-reduce job. 449 * 450 * @param conf The configuration of the job 451 * @param path {@link Path} to be added to the list of inputs for 452 * the map-reduce job. 453 */ 454 public static void addInputPath(JobConf conf, Path path ) { 455 path = new Path(conf.getWorkingDirectory(), path); 456 String dirStr = StringUtils.escapeString(path.toString()); 457 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 458 FileInputFormat.INPUT_DIR); 459 conf.set(org.apache.hadoop.mapreduce.lib.input. 460 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 461 dirs + StringUtils.COMMA_STR + dirStr); 462 } 463 464 // This method escapes commas in the glob pattern of the given paths. 465 private static String[] getPathStrings(String commaSeparatedPaths) { 466 int length = commaSeparatedPaths.length(); 467 int curlyOpen = 0; 468 int pathStart = 0; 469 boolean globPattern = false; 470 List<String> pathStrings = new ArrayList<String>(); 471 472 for (int i=0; i<length; i++) { 473 char ch = commaSeparatedPaths.charAt(i); 474 switch(ch) { 475 case '{' : { 476 curlyOpen++; 477 if (!globPattern) { 478 globPattern = true; 479 } 480 break; 481 } 482 case '}' : { 483 curlyOpen--; 484 if (curlyOpen == 0 && globPattern) { 485 globPattern = false; 486 } 487 break; 488 } 489 case ',' : { 490 if (!globPattern) { 491 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 492 pathStart = i + 1 ; 493 } 494 break; 495 } 496 default: 497 continue; // nothing special to do for this character 498 } 499 } 500 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 501 502 return pathStrings.toArray(new String[0]); 503 } 504 505 /** 506 * Get the list of input {@link Path}s for the map-reduce job. 507 * 508 * @param conf The configuration of the job 509 * @return the list of input {@link Path}s for the map-reduce job. 510 */ 511 public static Path[] getInputPaths(JobConf conf) { 512 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 513 FileInputFormat.INPUT_DIR, ""); 514 String [] list = StringUtils.split(dirs); 515 Path[] result = new Path[list.length]; 516 for (int i = 0; i < list.length; i++) { 517 result[i] = new Path(StringUtils.unEscapeString(list[i])); 518 } 519 return result; 520 } 521 522 523 private void sortInDescendingOrder(List<NodeInfo> mylist) { 524 Collections.sort(mylist, new Comparator<NodeInfo> () { 525 public int compare(NodeInfo obj1, NodeInfo obj2) { 526 527 if (obj1 == null || obj2 == null) 528 return -1; 529 530 if (obj1.getValue() == obj2.getValue()) { 531 return 0; 532 } 533 else { 534 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 535 } 536 } 537 } 538 ); 539 } 540 541 /** 542 * This function identifies and returns the hosts that contribute 543 * most for a given split. For calculating the contribution, rack 544 * locality is treated on par with host locality, so hosts from racks 545 * that contribute the most are preferred over hosts on racks that 546 * contribute less 547 * @param blkLocations The list of block locations 548 * @param offset 549 * @param splitSize 550 * @return an array of hosts that contribute most to this split 551 * @throws IOException 552 */ 553 protected String[] getSplitHosts(BlockLocation[] blkLocations, 554 long offset, long splitSize, NetworkTopology clusterMap) throws IOException { 555 return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, 556 clusterMap)[0]; 557 } 558 559 /** 560 * This function identifies and returns the hosts that contribute 561 * most for a given split. For calculating the contribution, rack 562 * locality is treated on par with host locality, so hosts from racks 563 * that contribute the most are preferred over hosts on racks that 564 * contribute less 565 * @param blkLocations The list of block locations 566 * @param offset 567 * @param splitSize 568 * @return two arrays - one of hosts that contribute most to this split, and 569 * one of hosts that contribute most to this split that have the data 570 * cached on them 571 * @throws IOException 572 */ 573 private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 574 long offset, long splitSize, NetworkTopology clusterMap) 575 throws IOException { 576 577 int startIndex = getBlockIndex(blkLocations, offset); 578 579 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 580 blkLocations[startIndex].getLength() - offset; 581 582 //If this is the only block, just return 583 if (bytesInThisBlock >= splitSize) { 584 return new String[][] { blkLocations[startIndex].getHosts(), 585 blkLocations[startIndex].getCachedHosts() }; 586 } 587 588 long bytesInFirstBlock = bytesInThisBlock; 589 int index = startIndex + 1; 590 splitSize -= bytesInThisBlock; 591 592 while (splitSize > 0) { 593 bytesInThisBlock = 594 Math.min(splitSize, blkLocations[index++].getLength()); 595 splitSize -= bytesInThisBlock; 596 } 597 598 long bytesInLastBlock = bytesInThisBlock; 599 int endIndex = index - 1; 600 601 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 602 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 603 String [] allTopos = new String[0]; 604 605 // Build the hierarchy and aggregate the contribution of 606 // bytes at each level. See TestGetSplitHosts.java 607 608 for (index = startIndex; index <= endIndex; index++) { 609 610 // Establish the bytes in this block 611 if (index == startIndex) { 612 bytesInThisBlock = bytesInFirstBlock; 613 } 614 else if (index == endIndex) { 615 bytesInThisBlock = bytesInLastBlock; 616 } 617 else { 618 bytesInThisBlock = blkLocations[index].getLength(); 619 } 620 621 allTopos = blkLocations[index].getTopologyPaths(); 622 623 // If no topology information is available, just 624 // prefix a fakeRack 625 if (allTopos.length == 0) { 626 allTopos = fakeRacks(blkLocations, index); 627 } 628 629 // NOTE: This code currently works only for one level of 630 // hierarchy (rack/host). However, it is relatively easy 631 // to extend this to support aggregation at different 632 // levels 633 634 for (String topo: allTopos) { 635 636 Node node, parentNode; 637 NodeInfo nodeInfo, parentNodeInfo; 638 639 node = clusterMap.getNode(topo); 640 641 if (node == null) { 642 node = new NodeBase(topo); 643 clusterMap.add(node); 644 } 645 646 nodeInfo = hostsMap.get(node); 647 648 if (nodeInfo == null) { 649 nodeInfo = new NodeInfo(node); 650 hostsMap.put(node,nodeInfo); 651 parentNode = node.getParent(); 652 parentNodeInfo = racksMap.get(parentNode); 653 if (parentNodeInfo == null) { 654 parentNodeInfo = new NodeInfo(parentNode); 655 racksMap.put(parentNode,parentNodeInfo); 656 } 657 parentNodeInfo.addLeaf(nodeInfo); 658 } 659 else { 660 nodeInfo = hostsMap.get(node); 661 parentNode = node.getParent(); 662 parentNodeInfo = racksMap.get(parentNode); 663 } 664 665 nodeInfo.addValue(index, bytesInThisBlock); 666 parentNodeInfo.addValue(index, bytesInThisBlock); 667 668 } // for all topos 669 670 } // for all indices 671 672 // We don't yet support cached hosts when bytesInThisBlock > splitSize 673 return new String[][] { identifyHosts(allTopos.length, racksMap), 674 new String[0]}; 675 } 676 677 private String[] identifyHosts(int replicationFactor, 678 Map<Node,NodeInfo> racksMap) { 679 680 String [] retVal = new String[replicationFactor]; 681 682 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 683 684 rackList.addAll(racksMap.values()); 685 686 // Sort the racks based on their contribution to this split 687 sortInDescendingOrder(rackList); 688 689 boolean done = false; 690 int index = 0; 691 692 // Get the host list for all our aggregated items, sort 693 // them and return the top entries 694 for (NodeInfo ni: rackList) { 695 696 Set<NodeInfo> hostSet = ni.getLeaves(); 697 698 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 699 hostList.addAll(hostSet); 700 701 // Sort the hosts in this rack based on their contribution 702 sortInDescendingOrder(hostList); 703 704 for (NodeInfo host: hostList) { 705 // Strip out the port number from the host name 706 retVal[index++] = host.node.getName().split(":")[0]; 707 if (index == replicationFactor) { 708 done = true; 709 break; 710 } 711 } 712 713 if (done == true) { 714 break; 715 } 716 } 717 return retVal; 718 } 719 720 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 721 throws IOException { 722 String[] allHosts = blkLocations[index].getHosts(); 723 String[] allTopos = new String[allHosts.length]; 724 for (int i = 0; i < allHosts.length; i++) { 725 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 726 } 727 return allTopos; 728 } 729 730 731 private static class NodeInfo { 732 final Node node; 733 final Set<Integer> blockIds; 734 final Set<NodeInfo> leaves; 735 736 private long value; 737 738 NodeInfo(Node node) { 739 this.node = node; 740 blockIds = new HashSet<Integer>(); 741 leaves = new HashSet<NodeInfo>(); 742 } 743 744 long getValue() {return value;} 745 746 void addValue(int blockIndex, long value) { 747 if (blockIds.add(blockIndex) == true) { 748 this.value += value; 749 } 750 } 751 752 Set<NodeInfo> getLeaves() { return leaves;} 753 754 void addLeaf(NodeInfo nodeInfo) { 755 leaves.add(nodeInfo); 756 } 757 } 758}