001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import java.util.concurrent.TimeUnit; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileStatus; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.LocatedFileStatus; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.PathFilter; 036import org.apache.hadoop.fs.BlockLocation; 037import org.apache.hadoop.fs.RemoteIterator; 038import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 039import org.apache.hadoop.mapred.SplitLocationInfo; 040import org.apache.hadoop.mapreduce.InputFormat; 041import org.apache.hadoop.mapreduce.InputSplit; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.mapreduce.JobContext; 044import org.apache.hadoop.mapreduce.Mapper; 045import org.apache.hadoop.mapreduce.security.TokenCache; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.util.StopWatch; 048import org.apache.hadoop.util.StringUtils; 049 050import com.google.common.collect.Lists; 051 052/** 053 * A base class for file-based {@link InputFormat}s. 054 * 055 * <p><code>FileInputFormat</code> is the base class for all file-based 056 * <code>InputFormat</code>s. This provides a generic implementation of 057 * {@link #getSplits(JobContext)}. 058 * Subclasses of <code>FileInputFormat</code> can also override the 059 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 060 * not split-up and are processed as a whole by {@link Mapper}s. 061 */ 062@InterfaceAudience.Public 063@InterfaceStability.Stable 064public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 065 public static final String INPUT_DIR = 066 "mapreduce.input.fileinputformat.inputdir"; 067 public static final String SPLIT_MAXSIZE = 068 "mapreduce.input.fileinputformat.split.maxsize"; 069 public static final String SPLIT_MINSIZE = 070 "mapreduce.input.fileinputformat.split.minsize"; 071 public static final String PATHFILTER_CLASS = 072 "mapreduce.input.pathFilter.class"; 073 public static final String NUM_INPUT_FILES = 074 "mapreduce.input.fileinputformat.numinputfiles"; 075 public static final String INPUT_DIR_RECURSIVE = 076 "mapreduce.input.fileinputformat.input.dir.recursive"; 077 public static final String LIST_STATUS_NUM_THREADS = 078 "mapreduce.input.fileinputformat.list-status.num-threads"; 079 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 080 081 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 082 083 private static final double SPLIT_SLOP = 1.1; // 10% slop 084 085 @Deprecated 086 public static enum Counter { 087 BYTES_READ 088 } 089 090 private static final PathFilter hiddenFileFilter = new PathFilter(){ 091 public boolean accept(Path p){ 092 String name = p.getName(); 093 return !name.startsWith("_") && !name.startsWith("."); 094 } 095 }; 096 097 /** 098 * Proxy PathFilter that accepts a path only if all filters given in the 099 * constructor do. Used by the listPaths() to apply the built-in 100 * hiddenFileFilter together with a user provided one (if any). 101 */ 102 private static class MultiPathFilter implements PathFilter { 103 private List<PathFilter> filters; 104 105 public MultiPathFilter(List<PathFilter> filters) { 106 this.filters = filters; 107 } 108 109 public boolean accept(Path path) { 110 for (PathFilter filter : filters) { 111 if (!filter.accept(path)) { 112 return false; 113 } 114 } 115 return true; 116 } 117 } 118 119 /** 120 * @param job 121 * the job to modify 122 * @param inputDirRecursive 123 */ 124 public static void setInputDirRecursive(Job job, 125 boolean inputDirRecursive) { 126 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 127 inputDirRecursive); 128 } 129 130 /** 131 * @param job 132 * the job to look at. 133 * @return should the files to be read recursively? 134 */ 135 public static boolean getInputDirRecursive(JobContext job) { 136 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 137 false); 138 } 139 140 /** 141 * Get the lower bound on split size imposed by the format. 142 * @return the number of bytes of the minimal split for this format 143 */ 144 protected long getFormatMinSplitSize() { 145 return 1; 146 } 147 148 /** 149 * Is the given filename splitable? Usually, true, but if the file is 150 * stream compressed, it will not be. 151 * 152 * <code>FileInputFormat</code> implementations can override this and return 153 * <code>false</code> to ensure that individual input files are never split-up 154 * so that {@link Mapper}s process entire files. 155 * 156 * @param context the job context 157 * @param filename the file name to check 158 * @return is this file splitable? 159 */ 160 protected boolean isSplitable(JobContext context, Path filename) { 161 return true; 162 } 163 164 /** 165 * Set a PathFilter to be applied to the input paths for the map-reduce job. 166 * @param job the job to modify 167 * @param filter the PathFilter class use for filtering the input paths. 168 */ 169 public static void setInputPathFilter(Job job, 170 Class<? extends PathFilter> filter) { 171 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 172 PathFilter.class); 173 } 174 175 /** 176 * Set the minimum input split size 177 * @param job the job to modify 178 * @param size the minimum size 179 */ 180 public static void setMinInputSplitSize(Job job, 181 long size) { 182 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 183 } 184 185 /** 186 * Get the minimum split size 187 * @param job the job 188 * @return the minimum number of bytes that can be in a split 189 */ 190 public static long getMinSplitSize(JobContext job) { 191 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 192 } 193 194 /** 195 * Set the maximum split size 196 * @param job the job to modify 197 * @param size the maximum split size 198 */ 199 public static void setMaxInputSplitSize(Job job, 200 long size) { 201 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 202 } 203 204 /** 205 * Get the maximum split size. 206 * @param context the job to look at. 207 * @return the maximum number of bytes a split can include 208 */ 209 public static long getMaxSplitSize(JobContext context) { 210 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 211 Long.MAX_VALUE); 212 } 213 214 /** 215 * Get a PathFilter instance of the filter set for the input paths. 216 * 217 * @return the PathFilter instance set for the job, NULL if none has been set. 218 */ 219 public static PathFilter getInputPathFilter(JobContext context) { 220 Configuration conf = context.getConfiguration(); 221 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 222 PathFilter.class); 223 return (filterClass != null) ? 224 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 225 } 226 227 /** List input directories. 228 * Subclasses may override to, e.g., select only files matching a regular 229 * expression. 230 * 231 * @param job the job to list input paths for 232 * @return array of FileStatus objects 233 * @throws IOException if zero items. 234 */ 235 protected List<FileStatus> listStatus(JobContext job 236 ) throws IOException { 237 Path[] dirs = getInputPaths(job); 238 if (dirs.length == 0) { 239 throw new IOException("No input paths specified in job"); 240 } 241 242 // get tokens for all the required FileSystems.. 243 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 244 job.getConfiguration()); 245 246 // Whether we need to recursive look into the directory structure 247 boolean recursive = getInputDirRecursive(job); 248 249 // creates a MultiPathFilter with the hiddenFileFilter and the 250 // user provided one (if any). 251 List<PathFilter> filters = new ArrayList<PathFilter>(); 252 filters.add(hiddenFileFilter); 253 PathFilter jobFilter = getInputPathFilter(job); 254 if (jobFilter != null) { 255 filters.add(jobFilter); 256 } 257 PathFilter inputFilter = new MultiPathFilter(filters); 258 259 List<FileStatus> result = null; 260 261 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 262 DEFAULT_LIST_STATUS_NUM_THREADS); 263 StopWatch sw = new StopWatch().start(); 264 if (numThreads == 1) { 265 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 266 } else { 267 Iterable<FileStatus> locatedFiles = null; 268 try { 269 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 270 job.getConfiguration(), dirs, recursive, inputFilter, true); 271 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 272 } catch (InterruptedException e) { 273 throw new IOException("Interrupted while getting file statuses"); 274 } 275 result = Lists.newArrayList(locatedFiles); 276 } 277 278 sw.stop(); 279 if (LOG.isDebugEnabled()) { 280 LOG.debug("Time taken to get FileStatuses: " 281 + sw.now(TimeUnit.MILLISECONDS)); 282 } 283 LOG.info("Total input paths to process : " + result.size()); 284 return result; 285 } 286 287 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 288 PathFilter inputFilter, boolean recursive) throws IOException { 289 List<FileStatus> result = new ArrayList<FileStatus>(); 290 List<IOException> errors = new ArrayList<IOException>(); 291 for (int i=0; i < dirs.length; ++i) { 292 Path p = dirs[i]; 293 FileSystem fs = p.getFileSystem(job.getConfiguration()); 294 FileStatus[] matches = fs.globStatus(p, inputFilter); 295 if (matches == null) { 296 errors.add(new IOException("Input path does not exist: " + p)); 297 } else if (matches.length == 0) { 298 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 299 } else { 300 for (FileStatus globStat: matches) { 301 if (globStat.isDirectory()) { 302 RemoteIterator<LocatedFileStatus> iter = 303 fs.listLocatedStatus(globStat.getPath()); 304 while (iter.hasNext()) { 305 LocatedFileStatus stat = iter.next(); 306 if (inputFilter.accept(stat.getPath())) { 307 if (recursive && stat.isDirectory()) { 308 addInputPathRecursively(result, fs, stat.getPath(), 309 inputFilter); 310 } else { 311 result.add(stat); 312 } 313 } 314 } 315 } else { 316 result.add(globStat); 317 } 318 } 319 } 320 } 321 322 if (!errors.isEmpty()) { 323 throw new InvalidInputException(errors); 324 } 325 return result; 326 } 327 328 /** 329 * Add files in the input path recursively into the results. 330 * @param result 331 * The List to store all files. 332 * @param fs 333 * The FileSystem. 334 * @param path 335 * The input path. 336 * @param inputFilter 337 * The input filter that can be used to filter files/dirs. 338 * @throws IOException 339 */ 340 protected void addInputPathRecursively(List<FileStatus> result, 341 FileSystem fs, Path path, PathFilter inputFilter) 342 throws IOException { 343 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 344 while (iter.hasNext()) { 345 LocatedFileStatus stat = iter.next(); 346 if (inputFilter.accept(stat.getPath())) { 347 if (stat.isDirectory()) { 348 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 349 } else { 350 result.add(stat); 351 } 352 } 353 } 354 } 355 356 357 /** 358 * A factory that makes the split for this class. It can be overridden 359 * by sub-classes to make sub-types 360 */ 361 protected FileSplit makeSplit(Path file, long start, long length, 362 String[] hosts) { 363 return new FileSplit(file, start, length, hosts); 364 } 365 366 /** 367 * A factory that makes the split for this class. It can be overridden 368 * by sub-classes to make sub-types 369 */ 370 protected FileSplit makeSplit(Path file, long start, long length, 371 String[] hosts, String[] inMemoryHosts) { 372 return new FileSplit(file, start, length, hosts, inMemoryHosts); 373 } 374 375 /** 376 * Generate the list of files and make them into FileSplits. 377 * @param job the job context 378 * @throws IOException 379 */ 380 public List<InputSplit> getSplits(JobContext job) throws IOException { 381 StopWatch sw = new StopWatch().start(); 382 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 383 long maxSize = getMaxSplitSize(job); 384 385 // generate splits 386 List<InputSplit> splits = new ArrayList<InputSplit>(); 387 List<FileStatus> files = listStatus(job); 388 for (FileStatus file: files) { 389 Path path = file.getPath(); 390 long length = file.getLen(); 391 if (length != 0) { 392 BlockLocation[] blkLocations; 393 if (file instanceof LocatedFileStatus) { 394 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 395 } else { 396 FileSystem fs = path.getFileSystem(job.getConfiguration()); 397 blkLocations = fs.getFileBlockLocations(file, 0, length); 398 } 399 if (isSplitable(job, path)) { 400 long blockSize = file.getBlockSize(); 401 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 402 403 long bytesRemaining = length; 404 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 405 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 406 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 407 blkLocations[blkIndex].getHosts(), 408 blkLocations[blkIndex].getCachedHosts())); 409 bytesRemaining -= splitSize; 410 } 411 412 if (bytesRemaining != 0) { 413 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 414 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 415 blkLocations[blkIndex].getHosts(), 416 blkLocations[blkIndex].getCachedHosts())); 417 } 418 } else { // not splitable 419 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 420 blkLocations[0].getCachedHosts())); 421 } 422 } else { 423 //Create empty hosts array for zero length files 424 splits.add(makeSplit(path, 0, length, new String[0])); 425 } 426 } 427 // Save the number of input files for metrics/loadgen 428 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 429 sw.stop(); 430 if (LOG.isDebugEnabled()) { 431 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 432 + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); 433 } 434 return splits; 435 } 436 437 protected long computeSplitSize(long blockSize, long minSize, 438 long maxSize) { 439 return Math.max(minSize, Math.min(maxSize, blockSize)); 440 } 441 442 protected int getBlockIndex(BlockLocation[] blkLocations, 443 long offset) { 444 for (int i = 0 ; i < blkLocations.length; i++) { 445 // is the offset inside this block? 446 if ((blkLocations[i].getOffset() <= offset) && 447 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 448 return i; 449 } 450 } 451 BlockLocation last = blkLocations[blkLocations.length -1]; 452 long fileLength = last.getOffset() + last.getLength() -1; 453 throw new IllegalArgumentException("Offset " + offset + 454 " is outside of file (0.." + 455 fileLength + ")"); 456 } 457 458 /** 459 * Sets the given comma separated paths as the list of inputs 460 * for the map-reduce job. 461 * 462 * @param job the job 463 * @param commaSeparatedPaths Comma separated paths to be set as 464 * the list of inputs for the map-reduce job. 465 */ 466 public static void setInputPaths(Job job, 467 String commaSeparatedPaths 468 ) throws IOException { 469 setInputPaths(job, StringUtils.stringToPath( 470 getPathStrings(commaSeparatedPaths))); 471 } 472 473 /** 474 * Add the given comma separated paths to the list of inputs for 475 * the map-reduce job. 476 * 477 * @param job The job to modify 478 * @param commaSeparatedPaths Comma separated paths to be added to 479 * the list of inputs for the map-reduce job. 480 */ 481 public static void addInputPaths(Job job, 482 String commaSeparatedPaths 483 ) throws IOException { 484 for (String str : getPathStrings(commaSeparatedPaths)) { 485 addInputPath(job, new Path(str)); 486 } 487 } 488 489 /** 490 * Set the array of {@link Path}s as the list of inputs 491 * for the map-reduce job. 492 * 493 * @param job The job to modify 494 * @param inputPaths the {@link Path}s of the input directories/files 495 * for the map-reduce job. 496 */ 497 public static void setInputPaths(Job job, 498 Path... inputPaths) throws IOException { 499 Configuration conf = job.getConfiguration(); 500 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 501 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 502 for(int i = 1; i < inputPaths.length;i++) { 503 str.append(StringUtils.COMMA_STR); 504 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 505 str.append(StringUtils.escapeString(path.toString())); 506 } 507 conf.set(INPUT_DIR, str.toString()); 508 } 509 510 /** 511 * Add a {@link Path} to the list of inputs for the map-reduce job. 512 * 513 * @param job The {@link Job} to modify 514 * @param path {@link Path} to be added to the list of inputs for 515 * the map-reduce job. 516 */ 517 public static void addInputPath(Job job, 518 Path path) throws IOException { 519 Configuration conf = job.getConfiguration(); 520 path = path.getFileSystem(conf).makeQualified(path); 521 String dirStr = StringUtils.escapeString(path.toString()); 522 String dirs = conf.get(INPUT_DIR); 523 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 524 } 525 526 // This method escapes commas in the glob pattern of the given paths. 527 private static String[] getPathStrings(String commaSeparatedPaths) { 528 int length = commaSeparatedPaths.length(); 529 int curlyOpen = 0; 530 int pathStart = 0; 531 boolean globPattern = false; 532 List<String> pathStrings = new ArrayList<String>(); 533 534 for (int i=0; i<length; i++) { 535 char ch = commaSeparatedPaths.charAt(i); 536 switch(ch) { 537 case '{' : { 538 curlyOpen++; 539 if (!globPattern) { 540 globPattern = true; 541 } 542 break; 543 } 544 case '}' : { 545 curlyOpen--; 546 if (curlyOpen == 0 && globPattern) { 547 globPattern = false; 548 } 549 break; 550 } 551 case ',' : { 552 if (!globPattern) { 553 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 554 pathStart = i + 1 ; 555 } 556 break; 557 } 558 default: 559 continue; // nothing special to do for this character 560 } 561 } 562 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 563 564 return pathStrings.toArray(new String[0]); 565 } 566 567 /** 568 * Get the list of input {@link Path}s for the map-reduce job. 569 * 570 * @param context The job 571 * @return the list of input {@link Path}s for the map-reduce job. 572 */ 573 public static Path[] getInputPaths(JobContext context) { 574 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 575 String [] list = StringUtils.split(dirs); 576 Path[] result = new Path[list.length]; 577 for (int i = 0; i < list.length; i++) { 578 result[i] = new Path(StringUtils.unEscapeString(list[i])); 579 } 580 return result; 581 } 582 583}