001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PathFilter; 035import org.apache.hadoop.fs.BlockLocation; 036import org.apache.hadoop.fs.RemoteIterator; 037import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 038import org.apache.hadoop.mapred.SplitLocationInfo; 039import org.apache.hadoop.mapreduce.InputFormat; 040import org.apache.hadoop.mapreduce.InputSplit; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.mapreduce.JobContext; 043import org.apache.hadoop.mapreduce.Mapper; 044import org.apache.hadoop.mapreduce.security.TokenCache; 045import org.apache.hadoop.util.ReflectionUtils; 046import org.apache.hadoop.util.StringUtils; 047 048import com.google.common.base.Stopwatch; 049import com.google.common.collect.Lists; 050 051/** 052 * A base class for file-based {@link InputFormat}s. 053 * 054 * <p><code>FileInputFormat</code> is the base class for all file-based 055 * <code>InputFormat</code>s. This provides a generic implementation of 056 * {@link #getSplits(JobContext)}. 057 * Subclasses of <code>FileInputFormat</code> can also override the 058 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 059 * not split-up and are processed as a whole by {@link Mapper}s. 060 */ 061@InterfaceAudience.Public 062@InterfaceStability.Stable 063public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 064 public static final String INPUT_DIR = 065 "mapreduce.input.fileinputformat.inputdir"; 066 public static final String SPLIT_MAXSIZE = 067 "mapreduce.input.fileinputformat.split.maxsize"; 068 public static final String SPLIT_MINSIZE = 069 "mapreduce.input.fileinputformat.split.minsize"; 070 public static final String PATHFILTER_CLASS = 071 "mapreduce.input.pathFilter.class"; 072 public static final String NUM_INPUT_FILES = 073 "mapreduce.input.fileinputformat.numinputfiles"; 074 public static final String INPUT_DIR_RECURSIVE = 075 "mapreduce.input.fileinputformat.input.dir.recursive"; 076 public static final String LIST_STATUS_NUM_THREADS = 077 "mapreduce.input.fileinputformat.list-status.num-threads"; 078 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 079 080 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 081 082 private static final double SPLIT_SLOP = 1.1; // 10% slop 083 084 @Deprecated 085 public static enum Counter { 086 BYTES_READ 087 } 088 089 private static final PathFilter hiddenFileFilter = new PathFilter(){ 090 public boolean accept(Path p){ 091 String name = p.getName(); 092 return !name.startsWith("_") && !name.startsWith("."); 093 } 094 }; 095 096 /** 097 * Proxy PathFilter that accepts a path only if all filters given in the 098 * constructor do. Used by the listPaths() to apply the built-in 099 * hiddenFileFilter together with a user provided one (if any). 100 */ 101 private static class MultiPathFilter implements PathFilter { 102 private List<PathFilter> filters; 103 104 public MultiPathFilter(List<PathFilter> filters) { 105 this.filters = filters; 106 } 107 108 public boolean accept(Path path) { 109 for (PathFilter filter : filters) { 110 if (!filter.accept(path)) { 111 return false; 112 } 113 } 114 return true; 115 } 116 } 117 118 /** 119 * @param job 120 * the job to modify 121 * @param inputDirRecursive 122 */ 123 public static void setInputDirRecursive(Job job, 124 boolean inputDirRecursive) { 125 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 126 inputDirRecursive); 127 } 128 129 /** 130 * @param job 131 * the job to look at. 132 * @return should the files to be read recursively? 133 */ 134 public static boolean getInputDirRecursive(JobContext job) { 135 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 136 false); 137 } 138 139 /** 140 * Get the lower bound on split size imposed by the format. 141 * @return the number of bytes of the minimal split for this format 142 */ 143 protected long getFormatMinSplitSize() { 144 return 1; 145 } 146 147 /** 148 * Is the given filename splitable? Usually, true, but if the file is 149 * stream compressed, it will not be. 150 * 151 * <code>FileInputFormat</code> implementations can override this and return 152 * <code>false</code> to ensure that individual input files are never split-up 153 * so that {@link Mapper}s process entire files. 154 * 155 * @param context the job context 156 * @param filename the file name to check 157 * @return is this file splitable? 158 */ 159 protected boolean isSplitable(JobContext context, Path filename) { 160 return true; 161 } 162 163 /** 164 * Set a PathFilter to be applied to the input paths for the map-reduce job. 165 * @param job the job to modify 166 * @param filter the PathFilter class use for filtering the input paths. 167 */ 168 public static void setInputPathFilter(Job job, 169 Class<? extends PathFilter> filter) { 170 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 171 PathFilter.class); 172 } 173 174 /** 175 * Set the minimum input split size 176 * @param job the job to modify 177 * @param size the minimum size 178 */ 179 public static void setMinInputSplitSize(Job job, 180 long size) { 181 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 182 } 183 184 /** 185 * Get the minimum split size 186 * @param job the job 187 * @return the minimum number of bytes that can be in a split 188 */ 189 public static long getMinSplitSize(JobContext job) { 190 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 191 } 192 193 /** 194 * Set the maximum split size 195 * @param job the job to modify 196 * @param size the maximum split size 197 */ 198 public static void setMaxInputSplitSize(Job job, 199 long size) { 200 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 201 } 202 203 /** 204 * Get the maximum split size. 205 * @param context the job to look at. 206 * @return the maximum number of bytes a split can include 207 */ 208 public static long getMaxSplitSize(JobContext context) { 209 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 210 Long.MAX_VALUE); 211 } 212 213 /** 214 * Get a PathFilter instance of the filter set for the input paths. 215 * 216 * @return the PathFilter instance set for the job, NULL if none has been set. 217 */ 218 public static PathFilter getInputPathFilter(JobContext context) { 219 Configuration conf = context.getConfiguration(); 220 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 221 PathFilter.class); 222 return (filterClass != null) ? 223 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 224 } 225 226 /** List input directories. 227 * Subclasses may override to, e.g., select only files matching a regular 228 * expression. 229 * 230 * @param job the job to list input paths for 231 * @return array of FileStatus objects 232 * @throws IOException if zero items. 233 */ 234 protected List<FileStatus> listStatus(JobContext job 235 ) throws IOException { 236 Path[] dirs = getInputPaths(job); 237 if (dirs.length == 0) { 238 throw new IOException("No input paths specified in job"); 239 } 240 241 // get tokens for all the required FileSystems.. 242 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 243 job.getConfiguration()); 244 245 // Whether we need to recursive look into the directory structure 246 boolean recursive = getInputDirRecursive(job); 247 248 // creates a MultiPathFilter with the hiddenFileFilter and the 249 // user provided one (if any). 250 List<PathFilter> filters = new ArrayList<PathFilter>(); 251 filters.add(hiddenFileFilter); 252 PathFilter jobFilter = getInputPathFilter(job); 253 if (jobFilter != null) { 254 filters.add(jobFilter); 255 } 256 PathFilter inputFilter = new MultiPathFilter(filters); 257 258 List<FileStatus> result = null; 259 260 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 261 DEFAULT_LIST_STATUS_NUM_THREADS); 262 Stopwatch sw = new Stopwatch().start(); 263 if (numThreads == 1) { 264 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 265 } else { 266 Iterable<FileStatus> locatedFiles = null; 267 try { 268 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 269 job.getConfiguration(), dirs, recursive, inputFilter, true); 270 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 271 } catch (InterruptedException e) { 272 throw new IOException("Interrupted while getting file statuses"); 273 } 274 result = Lists.newArrayList(locatedFiles); 275 } 276 277 sw.stop(); 278 if (LOG.isDebugEnabled()) { 279 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); 280 } 281 LOG.info("Total input paths to process : " + result.size()); 282 return result; 283 } 284 285 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 286 PathFilter inputFilter, boolean recursive) throws IOException { 287 List<FileStatus> result = new ArrayList<FileStatus>(); 288 List<IOException> errors = new ArrayList<IOException>(); 289 for (int i=0; i < dirs.length; ++i) { 290 Path p = dirs[i]; 291 FileSystem fs = p.getFileSystem(job.getConfiguration()); 292 FileStatus[] matches = fs.globStatus(p, inputFilter); 293 if (matches == null) { 294 errors.add(new IOException("Input path does not exist: " + p)); 295 } else if (matches.length == 0) { 296 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 297 } else { 298 for (FileStatus globStat: matches) { 299 if (globStat.isDirectory()) { 300 RemoteIterator<LocatedFileStatus> iter = 301 fs.listLocatedStatus(globStat.getPath()); 302 while (iter.hasNext()) { 303 LocatedFileStatus stat = iter.next(); 304 if (inputFilter.accept(stat.getPath())) { 305 if (recursive && stat.isDirectory()) { 306 addInputPathRecursively(result, fs, stat.getPath(), 307 inputFilter); 308 } else { 309 result.add(stat); 310 } 311 } 312 } 313 } else { 314 result.add(globStat); 315 } 316 } 317 } 318 } 319 320 if (!errors.isEmpty()) { 321 throw new InvalidInputException(errors); 322 } 323 return result; 324 } 325 326 /** 327 * Add files in the input path recursively into the results. 328 * @param result 329 * The List to store all files. 330 * @param fs 331 * The FileSystem. 332 * @param path 333 * The input path. 334 * @param inputFilter 335 * The input filter that can be used to filter files/dirs. 336 * @throws IOException 337 */ 338 protected void addInputPathRecursively(List<FileStatus> result, 339 FileSystem fs, Path path, PathFilter inputFilter) 340 throws IOException { 341 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 342 while (iter.hasNext()) { 343 LocatedFileStatus stat = iter.next(); 344 if (inputFilter.accept(stat.getPath())) { 345 if (stat.isDirectory()) { 346 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 347 } else { 348 result.add(stat); 349 } 350 } 351 } 352 } 353 354 355 /** 356 * A factory that makes the split for this class. It can be overridden 357 * by sub-classes to make sub-types 358 */ 359 protected FileSplit makeSplit(Path file, long start, long length, 360 String[] hosts) { 361 return new FileSplit(file, start, length, hosts); 362 } 363 364 /** 365 * A factory that makes the split for this class. It can be overridden 366 * by sub-classes to make sub-types 367 */ 368 protected FileSplit makeSplit(Path file, long start, long length, 369 String[] hosts, String[] inMemoryHosts) { 370 return new FileSplit(file, start, length, hosts, inMemoryHosts); 371 } 372 373 /** 374 * Generate the list of files and make them into FileSplits. 375 * @param job the job context 376 * @throws IOException 377 */ 378 public List<InputSplit> getSplits(JobContext job) throws IOException { 379 Stopwatch sw = new Stopwatch().start(); 380 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 381 long maxSize = getMaxSplitSize(job); 382 383 // generate splits 384 List<InputSplit> splits = new ArrayList<InputSplit>(); 385 List<FileStatus> files = listStatus(job); 386 for (FileStatus file: files) { 387 Path path = file.getPath(); 388 long length = file.getLen(); 389 if (length != 0) { 390 BlockLocation[] blkLocations; 391 if (file instanceof LocatedFileStatus) { 392 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 393 } else { 394 FileSystem fs = path.getFileSystem(job.getConfiguration()); 395 blkLocations = fs.getFileBlockLocations(file, 0, length); 396 } 397 if (isSplitable(job, path)) { 398 long blockSize = file.getBlockSize(); 399 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 400 401 long bytesRemaining = length; 402 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 403 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 404 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 405 blkLocations[blkIndex].getHosts(), 406 blkLocations[blkIndex].getCachedHosts())); 407 bytesRemaining -= splitSize; 408 } 409 410 if (bytesRemaining != 0) { 411 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 412 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 413 blkLocations[blkIndex].getHosts(), 414 blkLocations[blkIndex].getCachedHosts())); 415 } 416 } else { // not splitable 417 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 418 blkLocations[0].getCachedHosts())); 419 } 420 } else { 421 //Create empty hosts array for zero length files 422 splits.add(makeSplit(path, 0, length, new String[0])); 423 } 424 } 425 // Save the number of input files for metrics/loadgen 426 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 427 sw.stop(); 428 if (LOG.isDebugEnabled()) { 429 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 430 + ", TimeTaken: " + sw.elapsedMillis()); 431 } 432 return splits; 433 } 434 435 protected long computeSplitSize(long blockSize, long minSize, 436 long maxSize) { 437 return Math.max(minSize, Math.min(maxSize, blockSize)); 438 } 439 440 protected int getBlockIndex(BlockLocation[] blkLocations, 441 long offset) { 442 for (int i = 0 ; i < blkLocations.length; i++) { 443 // is the offset inside this block? 444 if ((blkLocations[i].getOffset() <= offset) && 445 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 446 return i; 447 } 448 } 449 BlockLocation last = blkLocations[blkLocations.length -1]; 450 long fileLength = last.getOffset() + last.getLength() -1; 451 throw new IllegalArgumentException("Offset " + offset + 452 " is outside of file (0.." + 453 fileLength + ")"); 454 } 455 456 /** 457 * Sets the given comma separated paths as the list of inputs 458 * for the map-reduce job. 459 * 460 * @param job the job 461 * @param commaSeparatedPaths Comma separated paths to be set as 462 * the list of inputs for the map-reduce job. 463 */ 464 public static void setInputPaths(Job job, 465 String commaSeparatedPaths 466 ) throws IOException { 467 setInputPaths(job, StringUtils.stringToPath( 468 getPathStrings(commaSeparatedPaths))); 469 } 470 471 /** 472 * Add the given comma separated paths to the list of inputs for 473 * the map-reduce job. 474 * 475 * @param job The job to modify 476 * @param commaSeparatedPaths Comma separated paths to be added to 477 * the list of inputs for the map-reduce job. 478 */ 479 public static void addInputPaths(Job job, 480 String commaSeparatedPaths 481 ) throws IOException { 482 for (String str : getPathStrings(commaSeparatedPaths)) { 483 addInputPath(job, new Path(str)); 484 } 485 } 486 487 /** 488 * Set the array of {@link Path}s as the list of inputs 489 * for the map-reduce job. 490 * 491 * @param job The job to modify 492 * @param inputPaths the {@link Path}s of the input directories/files 493 * for the map-reduce job. 494 */ 495 public static void setInputPaths(Job job, 496 Path... inputPaths) throws IOException { 497 Configuration conf = job.getConfiguration(); 498 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 499 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 500 for(int i = 1; i < inputPaths.length;i++) { 501 str.append(StringUtils.COMMA_STR); 502 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 503 str.append(StringUtils.escapeString(path.toString())); 504 } 505 conf.set(INPUT_DIR, str.toString()); 506 } 507 508 /** 509 * Add a {@link Path} to the list of inputs for the map-reduce job. 510 * 511 * @param job The {@link Job} to modify 512 * @param path {@link Path} to be added to the list of inputs for 513 * the map-reduce job. 514 */ 515 public static void addInputPath(Job job, 516 Path path) throws IOException { 517 Configuration conf = job.getConfiguration(); 518 path = path.getFileSystem(conf).makeQualified(path); 519 String dirStr = StringUtils.escapeString(path.toString()); 520 String dirs = conf.get(INPUT_DIR); 521 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 522 } 523 524 // This method escapes commas in the glob pattern of the given paths. 525 private static String[] getPathStrings(String commaSeparatedPaths) { 526 int length = commaSeparatedPaths.length(); 527 int curlyOpen = 0; 528 int pathStart = 0; 529 boolean globPattern = false; 530 List<String> pathStrings = new ArrayList<String>(); 531 532 for (int i=0; i<length; i++) { 533 char ch = commaSeparatedPaths.charAt(i); 534 switch(ch) { 535 case '{' : { 536 curlyOpen++; 537 if (!globPattern) { 538 globPattern = true; 539 } 540 break; 541 } 542 case '}' : { 543 curlyOpen--; 544 if (curlyOpen == 0 && globPattern) { 545 globPattern = false; 546 } 547 break; 548 } 549 case ',' : { 550 if (!globPattern) { 551 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 552 pathStart = i + 1 ; 553 } 554 break; 555 } 556 default: 557 continue; // nothing special to do for this character 558 } 559 } 560 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 561 562 return pathStrings.toArray(new String[0]); 563 } 564 565 /** 566 * Get the list of input {@link Path}s for the map-reduce job. 567 * 568 * @param context The job 569 * @return the list of input {@link Path}s for the map-reduce job. 570 */ 571 public static Path[] getInputPaths(JobContext context) { 572 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 573 String [] list = StringUtils.split(dirs); 574 Path[] result = new Path[list.length]; 575 for (int i = 0; i < list.length; i++) { 576 result[i] = new Path(StringUtils.unEscapeString(list[i])); 577 } 578 return result; 579 } 580 581}