001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.input; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Collection; 024 import java.util.Collections; 025 import java.util.LinkedHashSet; 026 import java.util.HashSet; 027 import java.util.List; 028 import java.util.HashMap; 029 import java.util.Set; 030 import java.util.Iterator; 031 import java.util.Map; 032 import java.util.Map.Entry; 033 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 import org.apache.hadoop.classification.InterfaceAudience; 037 import org.apache.hadoop.classification.InterfaceStability; 038 import org.apache.hadoop.conf.Configuration; 039 import org.apache.hadoop.fs.FileSystem; 040 import org.apache.hadoop.fs.LocatedFileStatus; 041 import org.apache.hadoop.fs.Path; 042 import org.apache.hadoop.fs.BlockLocation; 043 import org.apache.hadoop.fs.FileStatus; 044 import org.apache.hadoop.fs.PathFilter; 045 import org.apache.hadoop.io.compress.CompressionCodec; 046 import org.apache.hadoop.io.compress.CompressionCodecFactory; 047 import org.apache.hadoop.io.compress.SplittableCompressionCodec; 048 import org.apache.hadoop.mapreduce.InputFormat; 049 import org.apache.hadoop.mapreduce.InputSplit; 050 import org.apache.hadoop.mapreduce.JobContext; 051 import org.apache.hadoop.mapreduce.RecordReader; 052 import org.apache.hadoop.mapreduce.TaskAttemptContext; 053 import org.apache.hadoop.net.NodeBase; 054 import org.apache.hadoop.net.NetworkTopology; 055 056 import com.google.common.annotations.VisibleForTesting; 057 import com.google.common.collect.HashMultiset; 058 import com.google.common.collect.Multiset; 059 060 /** 061 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 062 * {@link InputFormat#getSplits(JobContext)} method. 063 * 064 * Splits are constructed from the files under the input paths. 065 * A split cannot have files from different pools. 066 * Each split returned may contain blocks from different files. 067 * If a maxSplitSize is specified, then blocks on the same node are 068 * combined to form a single split. Blocks that are left over are 069 * then combined with other blocks in the same rack. 070 * If maxSplitSize is not specified, then blocks from the same rack 071 * are combined in a single split; no attempt is made to create 072 * node-local splits. 073 * If the maxSplitSize is equal to the block size, then this class 074 * is similar to the default splitting behavior in Hadoop: each 075 * block is a locally processed split. 076 * Subclasses implement 077 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)} 078 * to construct <code>RecordReader</code>'s for 079 * <code>CombineFileSplit</code>'s. 080 * 081 * @see CombineFileSplit 082 */ 083 @InterfaceAudience.Public 084 @InterfaceStability.Stable 085 public abstract class CombineFileInputFormat<K, V> 086 extends FileInputFormat<K, V> { 087 088 private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class); 089 090 public static final String SPLIT_MINSIZE_PERNODE = 091 "mapreduce.input.fileinputformat.split.minsize.per.node"; 092 public static final String SPLIT_MINSIZE_PERRACK = 093 "mapreduce.input.fileinputformat.split.minsize.per.rack"; 094 // ability to limit the size of a single split 095 private long maxSplitSize = 0; 096 private long minSplitSizeNode = 0; 097 private long minSplitSizeRack = 0; 098 099 // A pool of input paths filters. A split cannot have blocks from files 100 // across multiple pools. 101 private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>(); 102 103 // mapping from a rack name to the set of Nodes in the rack 104 private HashMap<String, Set<String>> rackToNodes = 105 new HashMap<String, Set<String>>(); 106 /** 107 * Specify the maximum size (in bytes) of each split. Each split is 108 * approximately equal to the specified size. 109 */ 110 protected void setMaxSplitSize(long maxSplitSize) { 111 this.maxSplitSize = maxSplitSize; 112 } 113 114 /** 115 * Specify the minimum size (in bytes) of each split per node. 116 * This applies to data that is left over after combining data on a single 117 * node into splits that are of maximum size specified by maxSplitSize. 118 * This leftover data will be combined into its own split if its size 119 * exceeds minSplitSizeNode. 120 */ 121 protected void setMinSplitSizeNode(long minSplitSizeNode) { 122 this.minSplitSizeNode = minSplitSizeNode; 123 } 124 125 /** 126 * Specify the minimum size (in bytes) of each split per rack. 127 * This applies to data that is left over after combining data on a single 128 * rack into splits that are of maximum size specified by maxSplitSize. 129 * This leftover data will be combined into its own split if its size 130 * exceeds minSplitSizeRack. 131 */ 132 protected void setMinSplitSizeRack(long minSplitSizeRack) { 133 this.minSplitSizeRack = minSplitSizeRack; 134 } 135 136 /** 137 * Create a new pool and add the filters to it. 138 * A split cannot have files from different pools. 139 */ 140 protected void createPool(List<PathFilter> filters) { 141 pools.add(new MultiPathFilter(filters)); 142 } 143 144 /** 145 * Create a new pool and add the filters to it. 146 * A pathname can satisfy any one of the specified filters. 147 * A split cannot have files from different pools. 148 */ 149 protected void createPool(PathFilter... filters) { 150 MultiPathFilter multi = new MultiPathFilter(); 151 for (PathFilter f: filters) { 152 multi.add(f); 153 } 154 pools.add(multi); 155 } 156 157 @Override 158 protected boolean isSplitable(JobContext context, Path file) { 159 final CompressionCodec codec = 160 new CompressionCodecFactory(context.getConfiguration()).getCodec(file); 161 if (null == codec) { 162 return true; 163 } 164 return codec instanceof SplittableCompressionCodec; 165 } 166 167 /** 168 * default constructor 169 */ 170 public CombineFileInputFormat() { 171 } 172 173 @Override 174 public List<InputSplit> getSplits(JobContext job) 175 throws IOException { 176 long minSizeNode = 0; 177 long minSizeRack = 0; 178 long maxSize = 0; 179 Configuration conf = job.getConfiguration(); 180 181 // the values specified by setxxxSplitSize() takes precedence over the 182 // values that might have been specified in the config 183 if (minSplitSizeNode != 0) { 184 minSizeNode = minSplitSizeNode; 185 } else { 186 minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); 187 } 188 if (minSplitSizeRack != 0) { 189 minSizeRack = minSplitSizeRack; 190 } else { 191 minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); 192 } 193 if (maxSplitSize != 0) { 194 maxSize = maxSplitSize; 195 } else { 196 maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); 197 // If maxSize is not configured, a single split will be generated per 198 // node. 199 } 200 if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { 201 throw new IOException("Minimum split size pernode " + minSizeNode + 202 " cannot be larger than maximum split size " + 203 maxSize); 204 } 205 if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { 206 throw new IOException("Minimum split size per rack " + minSizeRack + 207 " cannot be larger than maximum split size " + 208 maxSize); 209 } 210 if (minSizeRack != 0 && minSizeNode > minSizeRack) { 211 throw new IOException("Minimum split size per node " + minSizeNode + 212 " cannot be larger than minimum split " + 213 "size per rack " + minSizeRack); 214 } 215 216 // all the files in input set 217 List<FileStatus> stats = listStatus(job); 218 List<InputSplit> splits = new ArrayList<InputSplit>(); 219 if (stats.size() == 0) { 220 return splits; 221 } 222 223 // In one single iteration, process all the paths in a single pool. 224 // Processing one pool at a time ensures that a split contains paths 225 // from a single pool only. 226 for (MultiPathFilter onepool : pools) { 227 ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>(); 228 229 // pick one input path. If it matches all the filters in a pool, 230 // add it to the output set 231 for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) { 232 FileStatus p = iter.next(); 233 if (onepool.accept(p.getPath())) { 234 myPaths.add(p); // add it to my output set 235 iter.remove(); 236 } 237 } 238 // create splits for all files in this pool. 239 getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); 240 } 241 242 // create splits for all files that are not in any pool. 243 getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); 244 245 // free up rackToNodes map 246 rackToNodes.clear(); 247 return splits; 248 } 249 250 /** 251 * Return all the splits in the specified set of paths 252 */ 253 private void getMoreSplits(JobContext job, List<FileStatus> stats, 254 long maxSize, long minSizeNode, long minSizeRack, 255 List<InputSplit> splits) 256 throws IOException { 257 Configuration conf = job.getConfiguration(); 258 259 // all blocks for all the files in input set 260 OneFileInfo[] files; 261 262 // mapping from a rack name to the list of blocks it has 263 HashMap<String, List<OneBlockInfo>> rackToBlocks = 264 new HashMap<String, List<OneBlockInfo>>(); 265 266 // mapping from a block to the nodes on which it has replicas 267 HashMap<OneBlockInfo, String[]> blockToNodes = 268 new HashMap<OneBlockInfo, String[]>(); 269 270 // mapping from a node to the list of blocks that it contains 271 HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 272 new HashMap<String, Set<OneBlockInfo>>(); 273 274 files = new OneFileInfo[stats.size()]; 275 if (stats.size() == 0) { 276 return; 277 } 278 279 // populate all the blocks for all files 280 long totLength = 0; 281 int i = 0; 282 for (FileStatus stat : stats) { 283 files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), 284 rackToBlocks, blockToNodes, nodeToBlocks, 285 rackToNodes, maxSize); 286 totLength += files[i].getLength(); 287 } 288 createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 289 maxSize, minSizeNode, minSizeRack, splits); 290 } 291 292 @VisibleForTesting 293 void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks, 294 Map<OneBlockInfo, String[]> blockToNodes, 295 Map<String, List<OneBlockInfo>> rackToBlocks, 296 long totLength, 297 long maxSize, 298 long minSizeNode, 299 long minSizeRack, 300 List<InputSplit> splits 301 ) { 302 ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>(); 303 long curSplitSize = 0; 304 305 int totalNodes = nodeToBlocks.size(); 306 long totalLength = totLength; 307 308 Multiset<String> splitsPerNode = HashMultiset.create(); 309 Set<String> completedNodes = new HashSet<String>(); 310 311 while(true) { 312 // it is allowed for maxSize to be 0. Disable smoothing load for such cases 313 314 // process all nodes and create splits that are local to a node. Generate 315 // one split per node iteration, and walk over nodes multiple times to 316 // distribute the splits across nodes. 317 for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks 318 .entrySet().iterator(); iter.hasNext();) { 319 Map.Entry<String, Set<OneBlockInfo>> one = iter.next(); 320 321 String node = one.getKey(); 322 323 // Skip the node if it has previously been marked as completed. 324 if (completedNodes.contains(node)) { 325 continue; 326 } 327 328 Set<OneBlockInfo> blocksInCurrentNode = one.getValue(); 329 330 // for each block, copy it into validBlocks. Delete it from 331 // blockToNodes so that the same block does not appear in 332 // two different splits. 333 Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator(); 334 while (oneBlockIter.hasNext()) { 335 OneBlockInfo oneblock = oneBlockIter.next(); 336 337 // Remove all blocks which may already have been assigned to other 338 // splits. 339 if(!blockToNodes.containsKey(oneblock)) { 340 oneBlockIter.remove(); 341 continue; 342 } 343 344 validBlocks.add(oneblock); 345 blockToNodes.remove(oneblock); 346 curSplitSize += oneblock.length; 347 348 // if the accumulated split size exceeds the maximum, then 349 // create this split. 350 if (maxSize != 0 && curSplitSize >= maxSize) { 351 // create an input split and add it to the splits array 352 addCreatedSplit(splits, Collections.singleton(node), validBlocks); 353 totalLength -= curSplitSize; 354 curSplitSize = 0; 355 356 splitsPerNode.add(node); 357 358 // Remove entries from blocksInNode so that we don't walk these 359 // again. 360 blocksInCurrentNode.removeAll(validBlocks); 361 validBlocks.clear(); 362 363 // Done creating a single split for this node. Move on to the next 364 // node so that splits are distributed across nodes. 365 break; 366 } 367 368 } 369 if (validBlocks.size() != 0) { 370 // This implies that the last few blocks (or all in case maxSize=0) 371 // were not part of a split. The node is complete. 372 373 // if there were any blocks left over and their combined size is 374 // larger than minSplitNode, then combine them into one split. 375 // Otherwise add them back to the unprocessed pool. It is likely 376 // that they will be combined with other blocks from the 377 // same rack later on. 378 // This condition also kicks in when max split size is not set. All 379 // blocks on a node will be grouped together into a single split. 380 if (minSizeNode != 0 && curSplitSize >= minSizeNode 381 && splitsPerNode.count(node) == 0) { 382 // haven't created any split on this machine. so its ok to add a 383 // smaller one for parallelism. Otherwise group it in the rack for 384 // balanced size create an input split and add it to the splits 385 // array 386 addCreatedSplit(splits, Collections.singleton(node), validBlocks); 387 totalLength -= curSplitSize; 388 splitsPerNode.add(node); 389 // Remove entries from blocksInNode so that we don't walk this again. 390 blocksInCurrentNode.removeAll(validBlocks); 391 // The node is done. This was the last set of blocks for this node. 392 } else { 393 // Put the unplaced blocks back into the pool for later rack-allocation. 394 for (OneBlockInfo oneblock : validBlocks) { 395 blockToNodes.put(oneblock, oneblock.hosts); 396 } 397 } 398 validBlocks.clear(); 399 curSplitSize = 0; 400 completedNodes.add(node); 401 } else { // No in-flight blocks. 402 if (blocksInCurrentNode.size() == 0) { 403 // Node is done. All blocks were fit into node-local splits. 404 completedNodes.add(node); 405 } // else Run through the node again. 406 } 407 } 408 409 // Check if node-local assignments are complete. 410 if (completedNodes.size() == totalNodes || totalLength == 0) { 411 // All nodes have been walked over and marked as completed or all blocks 412 // have been assigned. The rest should be handled via rackLock assignment. 413 LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " 414 + completedNodes.size() + ", size left: " + totalLength); 415 break; 416 } 417 } 418 419 // if blocks in a rack are below the specified minimum size, then keep them 420 // in 'overflow'. After the processing of all racks is complete, these 421 // overflow blocks will be combined into splits. 422 ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>(); 423 Set<String> racks = new HashSet<String>(); 424 425 // Process all racks over and over again until there is no more work to do. 426 while (blockToNodes.size() > 0) { 427 428 // Create one split for this rack before moving over to the next rack. 429 // Come back to this rack after creating a single split for each of the 430 // remaining racks. 431 // Process one rack location at a time, Combine all possible blocks that 432 // reside on this rack as one split. (constrained by minimum and maximum 433 // split size). 434 435 // iterate over all racks 436 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 437 rackToBlocks.entrySet().iterator(); iter.hasNext();) { 438 439 Map.Entry<String, List<OneBlockInfo>> one = iter.next(); 440 racks.add(one.getKey()); 441 List<OneBlockInfo> blocks = one.getValue(); 442 443 // for each block, copy it into validBlocks. Delete it from 444 // blockToNodes so that the same block does not appear in 445 // two different splits. 446 boolean createdSplit = false; 447 for (OneBlockInfo oneblock : blocks) { 448 if (blockToNodes.containsKey(oneblock)) { 449 validBlocks.add(oneblock); 450 blockToNodes.remove(oneblock); 451 curSplitSize += oneblock.length; 452 453 // if the accumulated split size exceeds the maximum, then 454 // create this split. 455 if (maxSize != 0 && curSplitSize >= maxSize) { 456 // create an input split and add it to the splits array 457 addCreatedSplit(splits, getHosts(racks), validBlocks); 458 createdSplit = true; 459 break; 460 } 461 } 462 } 463 464 // if we created a split, then just go to the next rack 465 if (createdSplit) { 466 curSplitSize = 0; 467 validBlocks.clear(); 468 racks.clear(); 469 continue; 470 } 471 472 if (!validBlocks.isEmpty()) { 473 if (minSizeRack != 0 && curSplitSize >= minSizeRack) { 474 // if there is a minimum size specified, then create a single split 475 // otherwise, store these blocks into overflow data structure 476 addCreatedSplit(splits, getHosts(racks), validBlocks); 477 } else { 478 // There were a few blocks in this rack that 479 // remained to be processed. Keep them in 'overflow' block list. 480 // These will be combined later. 481 overflowBlocks.addAll(validBlocks); 482 } 483 } 484 curSplitSize = 0; 485 validBlocks.clear(); 486 racks.clear(); 487 } 488 } 489 490 assert blockToNodes.isEmpty(); 491 assert curSplitSize == 0; 492 assert validBlocks.isEmpty(); 493 assert racks.isEmpty(); 494 495 // Process all overflow blocks 496 for (OneBlockInfo oneblock : overflowBlocks) { 497 validBlocks.add(oneblock); 498 curSplitSize += oneblock.length; 499 500 // This might cause an exiting rack location to be re-added, 501 // but it should be ok. 502 for (int i = 0; i < oneblock.racks.length; i++) { 503 racks.add(oneblock.racks[i]); 504 } 505 506 // if the accumulated split size exceeds the maximum, then 507 // create this split. 508 if (maxSize != 0 && curSplitSize >= maxSize) { 509 // create an input split and add it to the splits array 510 addCreatedSplit(splits, getHosts(racks), validBlocks); 511 curSplitSize = 0; 512 validBlocks.clear(); 513 racks.clear(); 514 } 515 } 516 517 // Process any remaining blocks, if any. 518 if (!validBlocks.isEmpty()) { 519 addCreatedSplit(splits, getHosts(racks), validBlocks); 520 } 521 } 522 523 /** 524 * Create a single split from the list of blocks specified in validBlocks 525 * Add this new split into splitList. 526 */ 527 private void addCreatedSplit(List<InputSplit> splitList, 528 Collection<String> locations, 529 ArrayList<OneBlockInfo> validBlocks) { 530 // create an input split 531 Path[] fl = new Path[validBlocks.size()]; 532 long[] offset = new long[validBlocks.size()]; 533 long[] length = new long[validBlocks.size()]; 534 for (int i = 0; i < validBlocks.size(); i++) { 535 fl[i] = validBlocks.get(i).onepath; 536 offset[i] = validBlocks.get(i).offset; 537 length[i] = validBlocks.get(i).length; 538 } 539 // add this split to the list that is returned 540 CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 541 length, locations.toArray(new String[0])); 542 splitList.add(thissplit); 543 } 544 545 /** 546 * This is not implemented yet. 547 */ 548 public abstract RecordReader<K, V> createRecordReader(InputSplit split, 549 TaskAttemptContext context) throws IOException; 550 551 /** 552 * information about one file from the File System 553 */ 554 @VisibleForTesting 555 static class OneFileInfo { 556 private long fileSize; // size of the file 557 private OneBlockInfo[] blocks; // all blocks in this file 558 559 OneFileInfo(FileStatus stat, Configuration conf, 560 boolean isSplitable, 561 HashMap<String, List<OneBlockInfo>> rackToBlocks, 562 HashMap<OneBlockInfo, String[]> blockToNodes, 563 HashMap<String, Set<OneBlockInfo>> nodeToBlocks, 564 HashMap<String, Set<String>> rackToNodes, 565 long maxSize) 566 throws IOException { 567 this.fileSize = 0; 568 569 // get block locations from file system 570 BlockLocation[] locations; 571 if (stat instanceof LocatedFileStatus) { 572 locations = ((LocatedFileStatus) stat).getBlockLocations(); 573 } else { 574 FileSystem fs = stat.getPath().getFileSystem(conf); 575 locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); 576 } 577 // create a list of all block and their locations 578 if (locations == null) { 579 blocks = new OneBlockInfo[0]; 580 } else { 581 582 if(locations.length == 0 && !stat.isDirectory()) { 583 locations = new BlockLocation[] { new BlockLocation() }; 584 } 585 586 if (!isSplitable) { 587 // if the file is not splitable, just create the one block with 588 // full file length 589 blocks = new OneBlockInfo[1]; 590 fileSize = stat.getLen(); 591 blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, 592 locations[0].getHosts(), locations[0].getTopologyPaths()); 593 } else { 594 ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>( 595 locations.length); 596 for (int i = 0; i < locations.length; i++) { 597 fileSize += locations[i].getLength(); 598 599 // each split can be a maximum of maxSize 600 long left = locations[i].getLength(); 601 long myOffset = locations[i].getOffset(); 602 long myLength = 0; 603 do { 604 if (maxSize == 0) { 605 myLength = left; 606 } else { 607 if (left > maxSize && left < 2 * maxSize) { 608 // if remainder is between max and 2*max - then 609 // instead of creating splits of size max, left-max we 610 // create splits of size left/2 and left/2. This is 611 // a heuristic to avoid creating really really small 612 // splits. 613 myLength = left / 2; 614 } else { 615 myLength = Math.min(maxSize, left); 616 } 617 } 618 OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), 619 myOffset, myLength, locations[i].getHosts(), 620 locations[i].getTopologyPaths()); 621 left -= myLength; 622 myOffset += myLength; 623 624 blocksList.add(oneblock); 625 } while (left > 0); 626 } 627 blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]); 628 } 629 630 populateBlockInfo(blocks, rackToBlocks, blockToNodes, 631 nodeToBlocks, rackToNodes); 632 } 633 } 634 635 @VisibleForTesting 636 static void populateBlockInfo(OneBlockInfo[] blocks, 637 Map<String, List<OneBlockInfo>> rackToBlocks, 638 Map<OneBlockInfo, String[]> blockToNodes, 639 Map<String, Set<OneBlockInfo>> nodeToBlocks, 640 Map<String, Set<String>> rackToNodes) { 641 for (OneBlockInfo oneblock : blocks) { 642 // add this block to the block --> node locations map 643 blockToNodes.put(oneblock, oneblock.hosts); 644 645 // For blocks that do not have host/rack information, 646 // assign to default rack. 647 String[] racks = null; 648 if (oneblock.hosts.length == 0) { 649 racks = new String[]{NetworkTopology.DEFAULT_RACK}; 650 } else { 651 racks = oneblock.racks; 652 } 653 654 // add this block to the rack --> block map 655 for (int j = 0; j < racks.length; j++) { 656 String rack = racks[j]; 657 List<OneBlockInfo> blklist = rackToBlocks.get(rack); 658 if (blklist == null) { 659 blklist = new ArrayList<OneBlockInfo>(); 660 rackToBlocks.put(rack, blklist); 661 } 662 blklist.add(oneblock); 663 if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) { 664 // Add this host to rackToNodes map 665 addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]); 666 } 667 } 668 669 // add this block to the node --> block map 670 for (int j = 0; j < oneblock.hosts.length; j++) { 671 String node = oneblock.hosts[j]; 672 Set<OneBlockInfo> blklist = nodeToBlocks.get(node); 673 if (blklist == null) { 674 blklist = new LinkedHashSet<OneBlockInfo>(); 675 nodeToBlocks.put(node, blklist); 676 } 677 blklist.add(oneblock); 678 } 679 } 680 } 681 682 long getLength() { 683 return fileSize; 684 } 685 686 OneBlockInfo[] getBlocks() { 687 return blocks; 688 } 689 } 690 691 /** 692 * information about one block from the File System 693 */ 694 @VisibleForTesting 695 static class OneBlockInfo { 696 Path onepath; // name of this file 697 long offset; // offset in file 698 long length; // length of this block 699 String[] hosts; // nodes on which this block resides 700 String[] racks; // network topology of hosts 701 702 OneBlockInfo(Path path, long offset, long len, 703 String[] hosts, String[] topologyPaths) { 704 this.onepath = path; 705 this.offset = offset; 706 this.hosts = hosts; 707 this.length = len; 708 assert (hosts.length == topologyPaths.length || 709 topologyPaths.length == 0); 710 711 // if the file system does not have any rack information, then 712 // use dummy rack location. 713 if (topologyPaths.length == 0) { 714 topologyPaths = new String[hosts.length]; 715 for (int i = 0; i < topologyPaths.length; i++) { 716 topologyPaths[i] = (new NodeBase(hosts[i], 717 NetworkTopology.DEFAULT_RACK)).toString(); 718 } 719 } 720 721 // The topology paths have the host name included as the last 722 // component. Strip it. 723 this.racks = new String[topologyPaths.length]; 724 for (int i = 0; i < topologyPaths.length; i++) { 725 this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); 726 } 727 } 728 } 729 730 protected BlockLocation[] getFileBlockLocations( 731 FileSystem fs, FileStatus stat) throws IOException { 732 if (stat instanceof LocatedFileStatus) { 733 return ((LocatedFileStatus) stat).getBlockLocations(); 734 } 735 return fs.getFileBlockLocations(stat, 0, stat.getLen()); 736 } 737 738 private static void addHostToRack(Map<String, Set<String>> rackToNodes, 739 String rack, String host) { 740 Set<String> hosts = rackToNodes.get(rack); 741 if (hosts == null) { 742 hosts = new HashSet<String>(); 743 rackToNodes.put(rack, hosts); 744 } 745 hosts.add(host); 746 } 747 748 private Set<String> getHosts(Set<String> racks) { 749 Set<String> hosts = new HashSet<String>(); 750 for (String rack : racks) { 751 if (rackToNodes.containsKey(rack)) { 752 hosts.addAll(rackToNodes.get(rack)); 753 } 754 } 755 return hosts; 756 } 757 758 /** 759 * Accept a path only if any one of filters given in the 760 * constructor do. 761 */ 762 private static class MultiPathFilter implements PathFilter { 763 private List<PathFilter> filters; 764 765 public MultiPathFilter() { 766 this.filters = new ArrayList<PathFilter>(); 767 } 768 769 public MultiPathFilter(List<PathFilter> filters) { 770 this.filters = filters; 771 } 772 773 public void add(PathFilter one) { 774 filters.add(one); 775 } 776 777 public boolean accept(Path path) { 778 for (PathFilter filter : filters) { 779 if (filter.accept(path)) { 780 return true; 781 } 782 } 783 return false; 784 } 785 786 public String toString() { 787 StringBuffer buf = new StringBuffer(); 788 buf.append("["); 789 for (PathFilter f: filters) { 790 buf.append(f); 791 buf.append(","); 792 } 793 buf.append("]"); 794 return buf.toString(); 795 } 796 } 797 }