001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.Comparator; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Set; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.hadoop.classification.InterfaceAudience; 035import org.apache.hadoop.classification.InterfaceStability; 036import org.apache.hadoop.fs.BlockLocation; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.LocatedFileStatus; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.PathFilter; 042import org.apache.hadoop.fs.RemoteIterator; 043import org.apache.hadoop.mapreduce.security.TokenCache; 044import org.apache.hadoop.net.NetworkTopology; 045import org.apache.hadoop.net.Node; 046import org.apache.hadoop.net.NodeBase; 047import org.apache.hadoop.util.ReflectionUtils; 048import org.apache.hadoop.util.StringUtils; 049 050/** 051 * A base class for file-based {@link InputFormat}. 052 * 053 * <p><code>FileInputFormat</code> is the base class for all file-based 054 * <code>InputFormat</code>s. This provides a generic implementation of 055 * {@link #getSplits(JobConf, int)}. 056 * Subclasses of <code>FileInputFormat</code> can also override the 057 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are 058 * not split-up and are processed as a whole by {@link Mapper}s. 059 */ 060@InterfaceAudience.Public 061@InterfaceStability.Stable 062public abstract class FileInputFormat<K, V> implements InputFormat<K, V> { 063 064 public static final Log LOG = 065 LogFactory.getLog(FileInputFormat.class); 066 067 public static final String NUM_INPUT_FILES = 068 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES; 069 070 public static final String INPUT_DIR_RECURSIVE = 071 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; 072 073 074 private static final double SPLIT_SLOP = 1.1; // 10% slop 075 076 private long minSplitSize = 1; 077 private static final PathFilter hiddenFileFilter = new PathFilter(){ 078 public boolean accept(Path p){ 079 String name = p.getName(); 080 return !name.startsWith("_") && !name.startsWith("."); 081 } 082 }; 083 protected void setMinSplitSize(long minSplitSize) { 084 this.minSplitSize = minSplitSize; 085 } 086 087 /** 088 * Proxy PathFilter that accepts a path only if all filters given in the 089 * constructor do. Used by the listPaths() to apply the built-in 090 * hiddenFileFilter together with a user provided one (if any). 091 */ 092 private static class MultiPathFilter implements PathFilter { 093 private List<PathFilter> filters; 094 095 public MultiPathFilter(List<PathFilter> filters) { 096 this.filters = filters; 097 } 098 099 public boolean accept(Path path) { 100 for (PathFilter filter : filters) { 101 if (!filter.accept(path)) { 102 return false; 103 } 104 } 105 return true; 106 } 107 } 108 109 /** 110 * Is the given filename splitable? Usually, true, but if the file is 111 * stream compressed, it will not be. 112 * 113 * <code>FileInputFormat</code> implementations can override this and return 114 * <code>false</code> to ensure that individual input files are never split-up 115 * so that {@link Mapper}s process entire files. 116 * 117 * @param fs the file system that the file is on 118 * @param filename the file name to check 119 * @return is this file splitable? 120 */ 121 protected boolean isSplitable(FileSystem fs, Path filename) { 122 return true; 123 } 124 125 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 126 JobConf job, 127 Reporter reporter) 128 throws IOException; 129 130 /** 131 * Set a PathFilter to be applied to the input paths for the map-reduce job. 132 * 133 * @param filter the PathFilter class use for filtering the input paths. 134 */ 135 public static void setInputPathFilter(JobConf conf, 136 Class<? extends PathFilter> filter) { 137 conf.setClass(org.apache.hadoop.mapreduce.lib.input. 138 FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class); 139 } 140 141 /** 142 * Get a PathFilter instance of the filter set for the input paths. 143 * 144 * @return the PathFilter instance set for the job, NULL if none has been set. 145 */ 146 public static PathFilter getInputPathFilter(JobConf conf) { 147 Class<? extends PathFilter> filterClass = conf.getClass( 148 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS, 149 null, PathFilter.class); 150 return (filterClass != null) ? 151 ReflectionUtils.newInstance(filterClass, conf) : null; 152 } 153 154 /** 155 * Add files in the input path recursively into the results. 156 * @param result 157 * The List to store all files. 158 * @param fs 159 * The FileSystem. 160 * @param path 161 * The input path. 162 * @param inputFilter 163 * The input filter that can be used to filter files/dirs. 164 * @throws IOException 165 */ 166 protected void addInputPathRecursively(List<FileStatus> result, 167 FileSystem fs, Path path, PathFilter inputFilter) 168 throws IOException { 169 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 170 while (iter.hasNext()) { 171 LocatedFileStatus stat = iter.next(); 172 if (inputFilter.accept(stat.getPath())) { 173 if (stat.isDirectory()) { 174 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 175 } else { 176 result.add(stat); 177 } 178 } 179 } 180 } 181 182 /** List input directories. 183 * Subclasses may override to, e.g., select only files matching a regular 184 * expression. 185 * 186 * @param job the job to list input paths for 187 * @return array of FileStatus objects 188 * @throws IOException if zero items. 189 */ 190 protected FileStatus[] listStatus(JobConf job) throws IOException { 191 Path[] dirs = getInputPaths(job); 192 if (dirs.length == 0) { 193 throw new IOException("No input paths specified in job"); 194 } 195 196 // get tokens for all the required FileSystems.. 197 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); 198 199 // Whether we need to recursive look into the directory structure 200 boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); 201 202 List<FileStatus> result = new ArrayList<FileStatus>(); 203 List<IOException> errors = new ArrayList<IOException>(); 204 205 // creates a MultiPathFilter with the hiddenFileFilter and the 206 // user provided one (if any). 207 List<PathFilter> filters = new ArrayList<PathFilter>(); 208 filters.add(hiddenFileFilter); 209 PathFilter jobFilter = getInputPathFilter(job); 210 if (jobFilter != null) { 211 filters.add(jobFilter); 212 } 213 PathFilter inputFilter = new MultiPathFilter(filters); 214 215 for (Path p: dirs) { 216 FileSystem fs = p.getFileSystem(job); 217 FileStatus[] matches = fs.globStatus(p, inputFilter); 218 if (matches == null) { 219 errors.add(new IOException("Input path does not exist: " + p)); 220 } else if (matches.length == 0) { 221 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 222 } else { 223 for (FileStatus globStat: matches) { 224 if (globStat.isDirectory()) { 225 RemoteIterator<LocatedFileStatus> iter = 226 fs.listLocatedStatus(globStat.getPath()); 227 while (iter.hasNext()) { 228 LocatedFileStatus stat = iter.next(); 229 if (inputFilter.accept(stat.getPath())) { 230 if (recursive && stat.isDirectory()) { 231 addInputPathRecursively(result, fs, stat.getPath(), 232 inputFilter); 233 } else { 234 result.add(stat); 235 } 236 } 237 } 238 } else { 239 result.add(globStat); 240 } 241 } 242 } 243 } 244 245 if (!errors.isEmpty()) { 246 throw new InvalidInputException(errors); 247 } 248 LOG.info("Total input paths to process : " + result.size()); 249 return result.toArray(new FileStatus[result.size()]); 250 } 251 252 /** 253 * A factory that makes the split for this class. It can be overridden 254 * by sub-classes to make sub-types 255 */ 256 protected FileSplit makeSplit(Path file, long start, long length, 257 String[] hosts) { 258 return new FileSplit(file, start, length, hosts); 259 } 260 261 /** Splits files returned by {@link #listStatus(JobConf)} when 262 * they're too big.*/ 263 public InputSplit[] getSplits(JobConf job, int numSplits) 264 throws IOException { 265 FileStatus[] files = listStatus(job); 266 267 // Save the number of input files for metrics/loadgen 268 job.setLong(NUM_INPUT_FILES, files.length); 269 long totalSize = 0; // compute total size 270 for (FileStatus file: files) { // check we have valid files 271 if (file.isDirectory()) { 272 throw new IOException("Not a file: "+ file.getPath()); 273 } 274 totalSize += file.getLen(); 275 } 276 277 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 278 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. 279 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); 280 281 // generate splits 282 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 283 NetworkTopology clusterMap = new NetworkTopology(); 284 for (FileStatus file: files) { 285 Path path = file.getPath(); 286 long length = file.getLen(); 287 if (length != 0) { 288 FileSystem fs = path.getFileSystem(job); 289 BlockLocation[] blkLocations; 290 if (file instanceof LocatedFileStatus) { 291 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 292 } else { 293 blkLocations = fs.getFileBlockLocations(file, 0, length); 294 } 295 if (isSplitable(fs, path)) { 296 long blockSize = file.getBlockSize(); 297 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 298 299 long bytesRemaining = length; 300 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 301 String[] splitHosts = getSplitHosts(blkLocations, 302 length-bytesRemaining, splitSize, clusterMap); 303 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 304 splitHosts)); 305 bytesRemaining -= splitSize; 306 } 307 308 if (bytesRemaining != 0) { 309 String[] splitHosts = getSplitHosts(blkLocations, length 310 - bytesRemaining, bytesRemaining, clusterMap); 311 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, 312 splitHosts)); 313 } 314 } else { 315 String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); 316 splits.add(makeSplit(path, 0, length, splitHosts)); 317 } 318 } else { 319 //Create empty hosts array for zero length files 320 splits.add(makeSplit(path, 0, length, new String[0])); 321 } 322 } 323 LOG.debug("Total # of splits: " + splits.size()); 324 return splits.toArray(new FileSplit[splits.size()]); 325 } 326 327 protected long computeSplitSize(long goalSize, long minSize, 328 long blockSize) { 329 return Math.max(minSize, Math.min(goalSize, blockSize)); 330 } 331 332 protected int getBlockIndex(BlockLocation[] blkLocations, 333 long offset) { 334 for (int i = 0 ; i < blkLocations.length; i++) { 335 // is the offset inside this block? 336 if ((blkLocations[i].getOffset() <= offset) && 337 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 338 return i; 339 } 340 } 341 BlockLocation last = blkLocations[blkLocations.length -1]; 342 long fileLength = last.getOffset() + last.getLength() -1; 343 throw new IllegalArgumentException("Offset " + offset + 344 " is outside of file (0.." + 345 fileLength + ")"); 346 } 347 348 /** 349 * Sets the given comma separated paths as the list of inputs 350 * for the map-reduce job. 351 * 352 * @param conf Configuration of the job 353 * @param commaSeparatedPaths Comma separated paths to be set as 354 * the list of inputs for the map-reduce job. 355 */ 356 public static void setInputPaths(JobConf conf, String commaSeparatedPaths) { 357 setInputPaths(conf, StringUtils.stringToPath( 358 getPathStrings(commaSeparatedPaths))); 359 } 360 361 /** 362 * Add the given comma separated paths to the list of inputs for 363 * the map-reduce job. 364 * 365 * @param conf The configuration of the job 366 * @param commaSeparatedPaths Comma separated paths to be added to 367 * the list of inputs for the map-reduce job. 368 */ 369 public static void addInputPaths(JobConf conf, String commaSeparatedPaths) { 370 for (String str : getPathStrings(commaSeparatedPaths)) { 371 addInputPath(conf, new Path(str)); 372 } 373 } 374 375 /** 376 * Set the array of {@link Path}s as the list of inputs 377 * for the map-reduce job. 378 * 379 * @param conf Configuration of the job. 380 * @param inputPaths the {@link Path}s of the input directories/files 381 * for the map-reduce job. 382 */ 383 public static void setInputPaths(JobConf conf, Path... inputPaths) { 384 Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); 385 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 386 for(int i = 1; i < inputPaths.length;i++) { 387 str.append(StringUtils.COMMA_STR); 388 path = new Path(conf.getWorkingDirectory(), inputPaths[i]); 389 str.append(StringUtils.escapeString(path.toString())); 390 } 391 conf.set(org.apache.hadoop.mapreduce.lib.input. 392 FileInputFormat.INPUT_DIR, str.toString()); 393 } 394 395 /** 396 * Add a {@link Path} to the list of inputs for the map-reduce job. 397 * 398 * @param conf The configuration of the job 399 * @param path {@link Path} to be added to the list of inputs for 400 * the map-reduce job. 401 */ 402 public static void addInputPath(JobConf conf, Path path ) { 403 path = new Path(conf.getWorkingDirectory(), path); 404 String dirStr = StringUtils.escapeString(path.toString()); 405 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 406 FileInputFormat.INPUT_DIR); 407 conf.set(org.apache.hadoop.mapreduce.lib.input. 408 FileInputFormat.INPUT_DIR, dirs == null ? dirStr : 409 dirs + StringUtils.COMMA_STR + dirStr); 410 } 411 412 // This method escapes commas in the glob pattern of the given paths. 413 private static String[] getPathStrings(String commaSeparatedPaths) { 414 int length = commaSeparatedPaths.length(); 415 int curlyOpen = 0; 416 int pathStart = 0; 417 boolean globPattern = false; 418 List<String> pathStrings = new ArrayList<String>(); 419 420 for (int i=0; i<length; i++) { 421 char ch = commaSeparatedPaths.charAt(i); 422 switch(ch) { 423 case '{' : { 424 curlyOpen++; 425 if (!globPattern) { 426 globPattern = true; 427 } 428 break; 429 } 430 case '}' : { 431 curlyOpen--; 432 if (curlyOpen == 0 && globPattern) { 433 globPattern = false; 434 } 435 break; 436 } 437 case ',' : { 438 if (!globPattern) { 439 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 440 pathStart = i + 1 ; 441 } 442 break; 443 } 444 } 445 } 446 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 447 448 return pathStrings.toArray(new String[0]); 449 } 450 451 /** 452 * Get the list of input {@link Path}s for the map-reduce job. 453 * 454 * @param conf The configuration of the job 455 * @return the list of input {@link Path}s for the map-reduce job. 456 */ 457 public static Path[] getInputPaths(JobConf conf) { 458 String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input. 459 FileInputFormat.INPUT_DIR, ""); 460 String [] list = StringUtils.split(dirs); 461 Path[] result = new Path[list.length]; 462 for (int i = 0; i < list.length; i++) { 463 result[i] = new Path(StringUtils.unEscapeString(list[i])); 464 } 465 return result; 466 } 467 468 469 private void sortInDescendingOrder(List<NodeInfo> mylist) { 470 Collections.sort(mylist, new Comparator<NodeInfo> () { 471 public int compare(NodeInfo obj1, NodeInfo obj2) { 472 473 if (obj1 == null || obj2 == null) 474 return -1; 475 476 if (obj1.getValue() == obj2.getValue()) { 477 return 0; 478 } 479 else { 480 return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); 481 } 482 } 483 } 484 ); 485 } 486 487 /** 488 * This function identifies and returns the hosts that contribute 489 * most for a given split. For calculating the contribution, rack 490 * locality is treated on par with host locality, so hosts from racks 491 * that contribute the most are preferred over hosts on racks that 492 * contribute less 493 * @param blkLocations The list of block locations 494 * @param offset 495 * @param splitSize 496 * @return array of hosts that contribute most to this split 497 * @throws IOException 498 */ 499 protected String[] getSplitHosts(BlockLocation[] blkLocations, 500 long offset, long splitSize, NetworkTopology clusterMap) 501 throws IOException { 502 503 int startIndex = getBlockIndex(blkLocations, offset); 504 505 long bytesInThisBlock = blkLocations[startIndex].getOffset() + 506 blkLocations[startIndex].getLength() - offset; 507 508 //If this is the only block, just return 509 if (bytesInThisBlock >= splitSize) { 510 return blkLocations[startIndex].getHosts(); 511 } 512 513 long bytesInFirstBlock = bytesInThisBlock; 514 int index = startIndex + 1; 515 splitSize -= bytesInThisBlock; 516 517 while (splitSize > 0) { 518 bytesInThisBlock = 519 Math.min(splitSize, blkLocations[index++].getLength()); 520 splitSize -= bytesInThisBlock; 521 } 522 523 long bytesInLastBlock = bytesInThisBlock; 524 int endIndex = index - 1; 525 526 Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>(); 527 Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>(); 528 String [] allTopos = new String[0]; 529 530 // Build the hierarchy and aggregate the contribution of 531 // bytes at each level. See TestGetSplitHosts.java 532 533 for (index = startIndex; index <= endIndex; index++) { 534 535 // Establish the bytes in this block 536 if (index == startIndex) { 537 bytesInThisBlock = bytesInFirstBlock; 538 } 539 else if (index == endIndex) { 540 bytesInThisBlock = bytesInLastBlock; 541 } 542 else { 543 bytesInThisBlock = blkLocations[index].getLength(); 544 } 545 546 allTopos = blkLocations[index].getTopologyPaths(); 547 548 // If no topology information is available, just 549 // prefix a fakeRack 550 if (allTopos.length == 0) { 551 allTopos = fakeRacks(blkLocations, index); 552 } 553 554 // NOTE: This code currently works only for one level of 555 // hierarchy (rack/host). However, it is relatively easy 556 // to extend this to support aggregation at different 557 // levels 558 559 for (String topo: allTopos) { 560 561 Node node, parentNode; 562 NodeInfo nodeInfo, parentNodeInfo; 563 564 node = clusterMap.getNode(topo); 565 566 if (node == null) { 567 node = new NodeBase(topo); 568 clusterMap.add(node); 569 } 570 571 nodeInfo = hostsMap.get(node); 572 573 if (nodeInfo == null) { 574 nodeInfo = new NodeInfo(node); 575 hostsMap.put(node,nodeInfo); 576 parentNode = node.getParent(); 577 parentNodeInfo = racksMap.get(parentNode); 578 if (parentNodeInfo == null) { 579 parentNodeInfo = new NodeInfo(parentNode); 580 racksMap.put(parentNode,parentNodeInfo); 581 } 582 parentNodeInfo.addLeaf(nodeInfo); 583 } 584 else { 585 nodeInfo = hostsMap.get(node); 586 parentNode = node.getParent(); 587 parentNodeInfo = racksMap.get(parentNode); 588 } 589 590 nodeInfo.addValue(index, bytesInThisBlock); 591 parentNodeInfo.addValue(index, bytesInThisBlock); 592 593 } // for all topos 594 595 } // for all indices 596 597 return identifyHosts(allTopos.length, racksMap); 598 } 599 600 private String[] identifyHosts(int replicationFactor, 601 Map<Node,NodeInfo> racksMap) { 602 603 String [] retVal = new String[replicationFactor]; 604 605 List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 606 607 rackList.addAll(racksMap.values()); 608 609 // Sort the racks based on their contribution to this split 610 sortInDescendingOrder(rackList); 611 612 boolean done = false; 613 int index = 0; 614 615 // Get the host list for all our aggregated items, sort 616 // them and return the top entries 617 for (NodeInfo ni: rackList) { 618 619 Set<NodeInfo> hostSet = ni.getLeaves(); 620 621 List<NodeInfo>hostList = new LinkedList<NodeInfo>(); 622 hostList.addAll(hostSet); 623 624 // Sort the hosts in this rack based on their contribution 625 sortInDescendingOrder(hostList); 626 627 for (NodeInfo host: hostList) { 628 // Strip out the port number from the host name 629 retVal[index++] = host.node.getName().split(":")[0]; 630 if (index == replicationFactor) { 631 done = true; 632 break; 633 } 634 } 635 636 if (done == true) { 637 break; 638 } 639 } 640 return retVal; 641 } 642 643 private String[] fakeRacks(BlockLocation[] blkLocations, int index) 644 throws IOException { 645 String[] allHosts = blkLocations[index].getHosts(); 646 String[] allTopos = new String[allHosts.length]; 647 for (int i = 0; i < allHosts.length; i++) { 648 allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; 649 } 650 return allTopos; 651 } 652 653 654 private static class NodeInfo { 655 final Node node; 656 final Set<Integer> blockIds; 657 final Set<NodeInfo> leaves; 658 659 private long value; 660 661 NodeInfo(Node node) { 662 this.node = node; 663 blockIds = new HashSet<Integer>(); 664 leaves = new HashSet<NodeInfo>(); 665 } 666 667 long getValue() {return value;} 668 669 void addValue(int blockIndex, long value) { 670 if (blockIds.add(blockIndex) == true) { 671 this.value += value; 672 } 673 } 674 675 Set<NodeInfo> getLeaves() { return leaves;} 676 677 void addLeaf(NodeInfo nodeInfo) { 678 leaves.add(nodeInfo); 679 } 680 } 681}