001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.input; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.fs.FileStatus; 031 import org.apache.hadoop.fs.FileSystem; 032 import org.apache.hadoop.fs.LocatedFileStatus; 033 import org.apache.hadoop.fs.Path; 034 import org.apache.hadoop.fs.PathFilter; 035 import org.apache.hadoop.fs.BlockLocation; 036 import org.apache.hadoop.fs.RemoteIterator; 037 import org.apache.hadoop.mapred.LocatedFileStatusFetcher; 038 import org.apache.hadoop.mapreduce.InputFormat; 039 import org.apache.hadoop.mapreduce.InputSplit; 040 import org.apache.hadoop.mapreduce.Job; 041 import org.apache.hadoop.mapreduce.JobContext; 042 import org.apache.hadoop.mapreduce.Mapper; 043 import org.apache.hadoop.mapreduce.security.TokenCache; 044 import org.apache.hadoop.util.ReflectionUtils; 045 import org.apache.hadoop.util.StringUtils; 046 047 import com.google.common.base.Stopwatch; 048 import com.google.common.collect.Lists; 049 050 /** 051 * A base class for file-based {@link InputFormat}s. 052 * 053 * <p><code>FileInputFormat</code> is the base class for all file-based 054 * <code>InputFormat</code>s. This provides a generic implementation of 055 * {@link #getSplits(JobContext)}. 056 * Subclasses of <code>FileInputFormat</code> can also override the 057 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 058 * not split-up and are processed as a whole by {@link Mapper}s. 059 */ 060 @InterfaceAudience.Public 061 @InterfaceStability.Stable 062 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 063 public static final String INPUT_DIR = 064 "mapreduce.input.fileinputformat.inputdir"; 065 public static final String SPLIT_MAXSIZE = 066 "mapreduce.input.fileinputformat.split.maxsize"; 067 public static final String SPLIT_MINSIZE = 068 "mapreduce.input.fileinputformat.split.minsize"; 069 public static final String PATHFILTER_CLASS = 070 "mapreduce.input.pathFilter.class"; 071 public static final String NUM_INPUT_FILES = 072 "mapreduce.input.fileinputformat.numinputfiles"; 073 public static final String INPUT_DIR_RECURSIVE = 074 "mapreduce.input.fileinputformat.input.dir.recursive"; 075 public static final String LIST_STATUS_NUM_THREADS = 076 "mapreduce.input.fileinputformat.list-status.num-threads"; 077 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; 078 079 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 080 081 private static final double SPLIT_SLOP = 1.1; // 10% slop 082 083 @Deprecated 084 public static enum Counter { 085 BYTES_READ 086 } 087 088 private static final PathFilter hiddenFileFilter = new PathFilter(){ 089 public boolean accept(Path p){ 090 String name = p.getName(); 091 return !name.startsWith("_") && !name.startsWith("."); 092 } 093 }; 094 095 /** 096 * Proxy PathFilter that accepts a path only if all filters given in the 097 * constructor do. Used by the listPaths() to apply the built-in 098 * hiddenFileFilter together with a user provided one (if any). 099 */ 100 private static class MultiPathFilter implements PathFilter { 101 private List<PathFilter> filters; 102 103 public MultiPathFilter(List<PathFilter> filters) { 104 this.filters = filters; 105 } 106 107 public boolean accept(Path path) { 108 for (PathFilter filter : filters) { 109 if (!filter.accept(path)) { 110 return false; 111 } 112 } 113 return true; 114 } 115 } 116 117 /** 118 * @param job 119 * the job to modify 120 * @param inputDirRecursive 121 */ 122 public static void setInputDirRecursive(Job job, 123 boolean inputDirRecursive) { 124 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 125 inputDirRecursive); 126 } 127 128 /** 129 * @param job 130 * the job to look at. 131 * @return should the files to be read recursively? 132 */ 133 public static boolean getInputDirRecursive(JobContext job) { 134 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 135 false); 136 } 137 138 /** 139 * Get the lower bound on split size imposed by the format. 140 * @return the number of bytes of the minimal split for this format 141 */ 142 protected long getFormatMinSplitSize() { 143 return 1; 144 } 145 146 /** 147 * Is the given filename splitable? Usually, true, but if the file is 148 * stream compressed, it will not be. 149 * 150 * <code>FileInputFormat</code> implementations can override this and return 151 * <code>false</code> to ensure that individual input files are never split-up 152 * so that {@link Mapper}s process entire files. 153 * 154 * @param context the job context 155 * @param filename the file name to check 156 * @return is this file splitable? 157 */ 158 protected boolean isSplitable(JobContext context, Path filename) { 159 return true; 160 } 161 162 /** 163 * Set a PathFilter to be applied to the input paths for the map-reduce job. 164 * @param job the job to modify 165 * @param filter the PathFilter class use for filtering the input paths. 166 */ 167 public static void setInputPathFilter(Job job, 168 Class<? extends PathFilter> filter) { 169 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 170 PathFilter.class); 171 } 172 173 /** 174 * Set the minimum input split size 175 * @param job the job to modify 176 * @param size the minimum size 177 */ 178 public static void setMinInputSplitSize(Job job, 179 long size) { 180 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 181 } 182 183 /** 184 * Get the minimum split size 185 * @param job the job 186 * @return the minimum number of bytes that can be in a split 187 */ 188 public static long getMinSplitSize(JobContext job) { 189 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 190 } 191 192 /** 193 * Set the maximum split size 194 * @param job the job to modify 195 * @param size the maximum split size 196 */ 197 public static void setMaxInputSplitSize(Job job, 198 long size) { 199 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 200 } 201 202 /** 203 * Get the maximum split size. 204 * @param context the job to look at. 205 * @return the maximum number of bytes a split can include 206 */ 207 public static long getMaxSplitSize(JobContext context) { 208 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 209 Long.MAX_VALUE); 210 } 211 212 /** 213 * Get a PathFilter instance of the filter set for the input paths. 214 * 215 * @return the PathFilter instance set for the job, NULL if none has been set. 216 */ 217 public static PathFilter getInputPathFilter(JobContext context) { 218 Configuration conf = context.getConfiguration(); 219 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 220 PathFilter.class); 221 return (filterClass != null) ? 222 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 223 } 224 225 /** List input directories. 226 * Subclasses may override to, e.g., select only files matching a regular 227 * expression. 228 * 229 * @param job the job to list input paths for 230 * @return array of FileStatus objects 231 * @throws IOException if zero items. 232 */ 233 protected List<FileStatus> listStatus(JobContext job 234 ) throws IOException { 235 Path[] dirs = getInputPaths(job); 236 if (dirs.length == 0) { 237 throw new IOException("No input paths specified in job"); 238 } 239 240 // get tokens for all the required FileSystems.. 241 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 242 job.getConfiguration()); 243 244 // Whether we need to recursive look into the directory structure 245 boolean recursive = getInputDirRecursive(job); 246 247 // creates a MultiPathFilter with the hiddenFileFilter and the 248 // user provided one (if any). 249 List<PathFilter> filters = new ArrayList<PathFilter>(); 250 filters.add(hiddenFileFilter); 251 PathFilter jobFilter = getInputPathFilter(job); 252 if (jobFilter != null) { 253 filters.add(jobFilter); 254 } 255 PathFilter inputFilter = new MultiPathFilter(filters); 256 257 List<FileStatus> result = null; 258 259 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, 260 DEFAULT_LIST_STATUS_NUM_THREADS); 261 Stopwatch sw = new Stopwatch().start(); 262 if (numThreads == 1) { 263 result = singleThreadedListStatus(job, dirs, inputFilter, recursive); 264 } else { 265 Iterable<FileStatus> locatedFiles = null; 266 try { 267 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( 268 job.getConfiguration(), dirs, recursive, inputFilter, true); 269 locatedFiles = locatedFileStatusFetcher.getFileStatuses(); 270 } catch (InterruptedException e) { 271 throw new IOException("Interrupted while getting file statuses"); 272 } 273 result = Lists.newArrayList(locatedFiles); 274 } 275 276 sw.stop(); 277 if (LOG.isDebugEnabled()) { 278 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis()); 279 } 280 LOG.info("Total input paths to process : " + result.size()); 281 return result; 282 } 283 284 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs, 285 PathFilter inputFilter, boolean recursive) throws IOException { 286 List<FileStatus> result = new ArrayList<FileStatus>(); 287 List<IOException> errors = new ArrayList<IOException>(); 288 for (int i=0; i < dirs.length; ++i) { 289 Path p = dirs[i]; 290 FileSystem fs = p.getFileSystem(job.getConfiguration()); 291 FileStatus[] matches = fs.globStatus(p, inputFilter); 292 if (matches == null) { 293 errors.add(new IOException("Input path does not exist: " + p)); 294 } else if (matches.length == 0) { 295 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 296 } else { 297 for (FileStatus globStat: matches) { 298 if (globStat.isDirectory()) { 299 RemoteIterator<LocatedFileStatus> iter = 300 fs.listLocatedStatus(globStat.getPath()); 301 while (iter.hasNext()) { 302 LocatedFileStatus stat = iter.next(); 303 if (inputFilter.accept(stat.getPath())) { 304 if (recursive && stat.isDirectory()) { 305 addInputPathRecursively(result, fs, stat.getPath(), 306 inputFilter); 307 } else { 308 result.add(stat); 309 } 310 } 311 } 312 } else { 313 result.add(globStat); 314 } 315 } 316 } 317 } 318 319 if (!errors.isEmpty()) { 320 throw new InvalidInputException(errors); 321 } 322 return result; 323 } 324 325 /** 326 * Add files in the input path recursively into the results. 327 * @param result 328 * The List to store all files. 329 * @param fs 330 * The FileSystem. 331 * @param path 332 * The input path. 333 * @param inputFilter 334 * The input filter that can be used to filter files/dirs. 335 * @throws IOException 336 */ 337 protected void addInputPathRecursively(List<FileStatus> result, 338 FileSystem fs, Path path, PathFilter inputFilter) 339 throws IOException { 340 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 341 while (iter.hasNext()) { 342 LocatedFileStatus stat = iter.next(); 343 if (inputFilter.accept(stat.getPath())) { 344 if (stat.isDirectory()) { 345 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 346 } else { 347 result.add(stat); 348 } 349 } 350 } 351 } 352 353 354 /** 355 * A factory that makes the split for this class. It can be overridden 356 * by sub-classes to make sub-types 357 */ 358 protected FileSplit makeSplit(Path file, long start, long length, 359 String[] hosts) { 360 return new FileSplit(file, start, length, hosts); 361 } 362 363 /** 364 * Generate the list of files and make them into FileSplits. 365 * @param job the job context 366 * @throws IOException 367 */ 368 public List<InputSplit> getSplits(JobContext job) throws IOException { 369 Stopwatch sw = new Stopwatch().start(); 370 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 371 long maxSize = getMaxSplitSize(job); 372 373 // generate splits 374 List<InputSplit> splits = new ArrayList<InputSplit>(); 375 List<FileStatus> files = listStatus(job); 376 for (FileStatus file: files) { 377 Path path = file.getPath(); 378 long length = file.getLen(); 379 if (length != 0) { 380 BlockLocation[] blkLocations; 381 if (file instanceof LocatedFileStatus) { 382 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 383 } else { 384 FileSystem fs = path.getFileSystem(job.getConfiguration()); 385 blkLocations = fs.getFileBlockLocations(file, 0, length); 386 } 387 if (isSplitable(job, path)) { 388 long blockSize = file.getBlockSize(); 389 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 390 391 long bytesRemaining = length; 392 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 393 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 394 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 395 blkLocations[blkIndex].getHosts())); 396 bytesRemaining -= splitSize; 397 } 398 399 if (bytesRemaining != 0) { 400 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 401 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 402 blkLocations[blkIndex].getHosts())); 403 } 404 } else { // not splitable 405 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); 406 } 407 } else { 408 //Create empty hosts array for zero length files 409 splits.add(makeSplit(path, 0, length, new String[0])); 410 } 411 } 412 // Save the number of input files for metrics/loadgen 413 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 414 sw.stop(); 415 if (LOG.isDebugEnabled()) { 416 LOG.debug("Total # of splits generated by getSplits: " + splits.size() 417 + ", TimeTaken: " + sw.elapsedMillis()); 418 } 419 return splits; 420 } 421 422 protected long computeSplitSize(long blockSize, long minSize, 423 long maxSize) { 424 return Math.max(minSize, Math.min(maxSize, blockSize)); 425 } 426 427 protected int getBlockIndex(BlockLocation[] blkLocations, 428 long offset) { 429 for (int i = 0 ; i < blkLocations.length; i++) { 430 // is the offset inside this block? 431 if ((blkLocations[i].getOffset() <= offset) && 432 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 433 return i; 434 } 435 } 436 BlockLocation last = blkLocations[blkLocations.length -1]; 437 long fileLength = last.getOffset() + last.getLength() -1; 438 throw new IllegalArgumentException("Offset " + offset + 439 " is outside of file (0.." + 440 fileLength + ")"); 441 } 442 443 /** 444 * Sets the given comma separated paths as the list of inputs 445 * for the map-reduce job. 446 * 447 * @param job the job 448 * @param commaSeparatedPaths Comma separated paths to be set as 449 * the list of inputs for the map-reduce job. 450 */ 451 public static void setInputPaths(Job job, 452 String commaSeparatedPaths 453 ) throws IOException { 454 setInputPaths(job, StringUtils.stringToPath( 455 getPathStrings(commaSeparatedPaths))); 456 } 457 458 /** 459 * Add the given comma separated paths to the list of inputs for 460 * the map-reduce job. 461 * 462 * @param job The job to modify 463 * @param commaSeparatedPaths Comma separated paths to be added to 464 * the list of inputs for the map-reduce job. 465 */ 466 public static void addInputPaths(Job job, 467 String commaSeparatedPaths 468 ) throws IOException { 469 for (String str : getPathStrings(commaSeparatedPaths)) { 470 addInputPath(job, new Path(str)); 471 } 472 } 473 474 /** 475 * Set the array of {@link Path}s as the list of inputs 476 * for the map-reduce job. 477 * 478 * @param job The job to modify 479 * @param inputPaths the {@link Path}s of the input directories/files 480 * for the map-reduce job. 481 */ 482 public static void setInputPaths(Job job, 483 Path... inputPaths) throws IOException { 484 Configuration conf = job.getConfiguration(); 485 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 486 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 487 for(int i = 1; i < inputPaths.length;i++) { 488 str.append(StringUtils.COMMA_STR); 489 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 490 str.append(StringUtils.escapeString(path.toString())); 491 } 492 conf.set(INPUT_DIR, str.toString()); 493 } 494 495 /** 496 * Add a {@link Path} to the list of inputs for the map-reduce job. 497 * 498 * @param job The {@link Job} to modify 499 * @param path {@link Path} to be added to the list of inputs for 500 * the map-reduce job. 501 */ 502 public static void addInputPath(Job job, 503 Path path) throws IOException { 504 Configuration conf = job.getConfiguration(); 505 path = path.getFileSystem(conf).makeQualified(path); 506 String dirStr = StringUtils.escapeString(path.toString()); 507 String dirs = conf.get(INPUT_DIR); 508 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 509 } 510 511 // This method escapes commas in the glob pattern of the given paths. 512 private static String[] getPathStrings(String commaSeparatedPaths) { 513 int length = commaSeparatedPaths.length(); 514 int curlyOpen = 0; 515 int pathStart = 0; 516 boolean globPattern = false; 517 List<String> pathStrings = new ArrayList<String>(); 518 519 for (int i=0; i<length; i++) { 520 char ch = commaSeparatedPaths.charAt(i); 521 switch(ch) { 522 case '{' : { 523 curlyOpen++; 524 if (!globPattern) { 525 globPattern = true; 526 } 527 break; 528 } 529 case '}' : { 530 curlyOpen--; 531 if (curlyOpen == 0 && globPattern) { 532 globPattern = false; 533 } 534 break; 535 } 536 case ',' : { 537 if (!globPattern) { 538 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 539 pathStart = i + 1 ; 540 } 541 break; 542 } 543 default: 544 continue; // nothing special to do for this character 545 } 546 } 547 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 548 549 return pathStrings.toArray(new String[0]); 550 } 551 552 /** 553 * Get the list of input {@link Path}s for the map-reduce job. 554 * 555 * @param context The job 556 * @return the list of input {@link Path}s for the map-reduce job. 557 */ 558 public static Path[] getInputPaths(JobContext context) { 559 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 560 String [] list = StringUtils.split(dirs); 561 Path[] result = new Path[list.length]; 562 for (int i = 0; i < list.length; i++) { 563 result[i] = new Path(StringUtils.unEscapeString(list[i])); 564 } 565 return result; 566 } 567 568 }