001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileStatus; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.LocatedFileStatus; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PathFilter; 035import org.apache.hadoop.fs.BlockLocation; 036import org.apache.hadoop.fs.RemoteIterator; 037import org.apache.hadoop.mapreduce.InputFormat; 038import org.apache.hadoop.mapreduce.InputSplit; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.JobContext; 041import org.apache.hadoop.mapreduce.Mapper; 042import org.apache.hadoop.mapreduce.security.TokenCache; 043import org.apache.hadoop.util.ReflectionUtils; 044import org.apache.hadoop.util.StringUtils; 045 046/** 047 * A base class for file-based {@link InputFormat}s. 048 * 049 * <p><code>FileInputFormat</code> is the base class for all file-based 050 * <code>InputFormat</code>s. This provides a generic implementation of 051 * {@link #getSplits(JobContext)}. 052 * Subclasses of <code>FileInputFormat</code> can also override the 053 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are 054 * not split-up and are processed as a whole by {@link Mapper}s. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Stable 058public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { 059 public static final String INPUT_DIR = 060 "mapreduce.input.fileinputformat.inputdir"; 061 public static final String SPLIT_MAXSIZE = 062 "mapreduce.input.fileinputformat.split.maxsize"; 063 public static final String SPLIT_MINSIZE = 064 "mapreduce.input.fileinputformat.split.minsize"; 065 public static final String PATHFILTER_CLASS = 066 "mapreduce.input.pathFilter.class"; 067 public static final String NUM_INPUT_FILES = 068 "mapreduce.input.fileinputformat.numinputfiles"; 069 public static final String INPUT_DIR_RECURSIVE = 070 "mapreduce.input.fileinputformat.input.dir.recursive"; 071 072 private static final Log LOG = LogFactory.getLog(FileInputFormat.class); 073 074 private static final double SPLIT_SLOP = 1.1; // 10% slop 075 076 private static final PathFilter hiddenFileFilter = new PathFilter(){ 077 public boolean accept(Path p){ 078 String name = p.getName(); 079 return !name.startsWith("_") && !name.startsWith("."); 080 } 081 }; 082 083 /** 084 * Proxy PathFilter that accepts a path only if all filters given in the 085 * constructor do. Used by the listPaths() to apply the built-in 086 * hiddenFileFilter together with a user provided one (if any). 087 */ 088 private static class MultiPathFilter implements PathFilter { 089 private List<PathFilter> filters; 090 091 public MultiPathFilter(List<PathFilter> filters) { 092 this.filters = filters; 093 } 094 095 public boolean accept(Path path) { 096 for (PathFilter filter : filters) { 097 if (!filter.accept(path)) { 098 return false; 099 } 100 } 101 return true; 102 } 103 } 104 105 /** 106 * @param job 107 * the job to modify 108 * @param inputDirRecursive 109 */ 110 public static void setInputDirRecursive(Job job, 111 boolean inputDirRecursive) { 112 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, 113 inputDirRecursive); 114 } 115 116 /** 117 * @param job 118 * the job to look at. 119 * @return should the files to be read recursively? 120 */ 121 public static boolean getInputDirRecursive(JobContext job) { 122 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE, 123 false); 124 } 125 126 /** 127 * Get the lower bound on split size imposed by the format. 128 * @return the number of bytes of the minimal split for this format 129 */ 130 protected long getFormatMinSplitSize() { 131 return 1; 132 } 133 134 /** 135 * Is the given filename splitable? Usually, true, but if the file is 136 * stream compressed, it will not be. 137 * 138 * <code>FileInputFormat</code> implementations can override this and return 139 * <code>false</code> to ensure that individual input files are never split-up 140 * so that {@link Mapper}s process entire files. 141 * 142 * @param context the job context 143 * @param filename the file name to check 144 * @return is this file splitable? 145 */ 146 protected boolean isSplitable(JobContext context, Path filename) { 147 return true; 148 } 149 150 /** 151 * Set a PathFilter to be applied to the input paths for the map-reduce job. 152 * @param job the job to modify 153 * @param filter the PathFilter class use for filtering the input paths. 154 */ 155 public static void setInputPathFilter(Job job, 156 Class<? extends PathFilter> filter) { 157 job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 158 PathFilter.class); 159 } 160 161 /** 162 * Set the minimum input split size 163 * @param job the job to modify 164 * @param size the minimum size 165 */ 166 public static void setMinInputSplitSize(Job job, 167 long size) { 168 job.getConfiguration().setLong(SPLIT_MINSIZE, size); 169 } 170 171 /** 172 * Get the minimum split size 173 * @param job the job 174 * @return the minimum number of bytes that can be in a split 175 */ 176 public static long getMinSplitSize(JobContext job) { 177 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); 178 } 179 180 /** 181 * Set the maximum split size 182 * @param job the job to modify 183 * @param size the maximum split size 184 */ 185 public static void setMaxInputSplitSize(Job job, 186 long size) { 187 job.getConfiguration().setLong(SPLIT_MAXSIZE, size); 188 } 189 190 /** 191 * Get the maximum split size. 192 * @param context the job to look at. 193 * @return the maximum number of bytes a split can include 194 */ 195 public static long getMaxSplitSize(JobContext context) { 196 return context.getConfiguration().getLong(SPLIT_MAXSIZE, 197 Long.MAX_VALUE); 198 } 199 200 /** 201 * Get a PathFilter instance of the filter set for the input paths. 202 * 203 * @return the PathFilter instance set for the job, NULL if none has been set. 204 */ 205 public static PathFilter getInputPathFilter(JobContext context) { 206 Configuration conf = context.getConfiguration(); 207 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null, 208 PathFilter.class); 209 return (filterClass != null) ? 210 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null; 211 } 212 213 /** List input directories. 214 * Subclasses may override to, e.g., select only files matching a regular 215 * expression. 216 * 217 * @param job the job to list input paths for 218 * @return array of FileStatus objects 219 * @throws IOException if zero items. 220 */ 221 protected List<FileStatus> listStatus(JobContext job 222 ) throws IOException { 223 List<FileStatus> result = new ArrayList<FileStatus>(); 224 Path[] dirs = getInputPaths(job); 225 if (dirs.length == 0) { 226 throw new IOException("No input paths specified in job"); 227 } 228 229 // get tokens for all the required FileSystems.. 230 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 231 job.getConfiguration()); 232 233 // Whether we need to recursive look into the directory structure 234 boolean recursive = getInputDirRecursive(job); 235 236 List<IOException> errors = new ArrayList<IOException>(); 237 238 // creates a MultiPathFilter with the hiddenFileFilter and the 239 // user provided one (if any). 240 List<PathFilter> filters = new ArrayList<PathFilter>(); 241 filters.add(hiddenFileFilter); 242 PathFilter jobFilter = getInputPathFilter(job); 243 if (jobFilter != null) { 244 filters.add(jobFilter); 245 } 246 PathFilter inputFilter = new MultiPathFilter(filters); 247 248 for (int i=0; i < dirs.length; ++i) { 249 Path p = dirs[i]; 250 FileSystem fs = p.getFileSystem(job.getConfiguration()); 251 FileStatus[] matches = fs.globStatus(p, inputFilter); 252 if (matches == null) { 253 errors.add(new IOException("Input path does not exist: " + p)); 254 } else if (matches.length == 0) { 255 errors.add(new IOException("Input Pattern " + p + " matches 0 files")); 256 } else { 257 for (FileStatus globStat: matches) { 258 if (globStat.isDirectory()) { 259 RemoteIterator<LocatedFileStatus> iter = 260 fs.listLocatedStatus(globStat.getPath()); 261 while (iter.hasNext()) { 262 LocatedFileStatus stat = iter.next(); 263 if (inputFilter.accept(stat.getPath())) { 264 if (recursive && stat.isDirectory()) { 265 addInputPathRecursively(result, fs, stat.getPath(), 266 inputFilter); 267 } else { 268 result.add(stat); 269 } 270 } 271 } 272 } else { 273 result.add(globStat); 274 } 275 } 276 } 277 } 278 279 if (!errors.isEmpty()) { 280 throw new InvalidInputException(errors); 281 } 282 LOG.info("Total input paths to process : " + result.size()); 283 return result; 284 } 285 286 /** 287 * Add files in the input path recursively into the results. 288 * @param result 289 * The List to store all files. 290 * @param fs 291 * The FileSystem. 292 * @param path 293 * The input path. 294 * @param inputFilter 295 * The input filter that can be used to filter files/dirs. 296 * @throws IOException 297 */ 298 protected void addInputPathRecursively(List<FileStatus> result, 299 FileSystem fs, Path path, PathFilter inputFilter) 300 throws IOException { 301 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path); 302 while (iter.hasNext()) { 303 LocatedFileStatus stat = iter.next(); 304 if (inputFilter.accept(stat.getPath())) { 305 if (stat.isDirectory()) { 306 addInputPathRecursively(result, fs, stat.getPath(), inputFilter); 307 } else { 308 result.add(stat); 309 } 310 } 311 } 312 } 313 314 315 /** 316 * A factory that makes the split for this class. It can be overridden 317 * by sub-classes to make sub-types 318 */ 319 protected FileSplit makeSplit(Path file, long start, long length, 320 String[] hosts) { 321 return new FileSplit(file, start, length, hosts); 322 } 323 324 /** 325 * Generate the list of files and make them into FileSplits. 326 * @param job the job context 327 * @throws IOException 328 */ 329 public List<InputSplit> getSplits(JobContext job) throws IOException { 330 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 331 long maxSize = getMaxSplitSize(job); 332 333 // generate splits 334 List<InputSplit> splits = new ArrayList<InputSplit>(); 335 List<FileStatus> files = listStatus(job); 336 for (FileStatus file: files) { 337 Path path = file.getPath(); 338 long length = file.getLen(); 339 if (length != 0) { 340 BlockLocation[] blkLocations; 341 if (file instanceof LocatedFileStatus) { 342 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 343 } else { 344 FileSystem fs = path.getFileSystem(job.getConfiguration()); 345 blkLocations = fs.getFileBlockLocations(file, 0, length); 346 } 347 if (isSplitable(job, path)) { 348 long blockSize = file.getBlockSize(); 349 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 350 351 long bytesRemaining = length; 352 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 353 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 354 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 355 blkLocations[blkIndex].getHosts())); 356 bytesRemaining -= splitSize; 357 } 358 359 if (bytesRemaining != 0) { 360 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 361 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 362 blkLocations[blkIndex].getHosts())); 363 } 364 } else { // not splitable 365 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); 366 } 367 } else { 368 //Create empty hosts array for zero length files 369 splits.add(makeSplit(path, 0, length, new String[0])); 370 } 371 } 372 // Save the number of input files for metrics/loadgen 373 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 374 LOG.debug("Total # of splits: " + splits.size()); 375 return splits; 376 } 377 378 protected long computeSplitSize(long blockSize, long minSize, 379 long maxSize) { 380 return Math.max(minSize, Math.min(maxSize, blockSize)); 381 } 382 383 protected int getBlockIndex(BlockLocation[] blkLocations, 384 long offset) { 385 for (int i = 0 ; i < blkLocations.length; i++) { 386 // is the offset inside this block? 387 if ((blkLocations[i].getOffset() <= offset) && 388 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ 389 return i; 390 } 391 } 392 BlockLocation last = blkLocations[blkLocations.length -1]; 393 long fileLength = last.getOffset() + last.getLength() -1; 394 throw new IllegalArgumentException("Offset " + offset + 395 " is outside of file (0.." + 396 fileLength + ")"); 397 } 398 399 /** 400 * Sets the given comma separated paths as the list of inputs 401 * for the map-reduce job. 402 * 403 * @param job the job 404 * @param commaSeparatedPaths Comma separated paths to be set as 405 * the list of inputs for the map-reduce job. 406 */ 407 public static void setInputPaths(Job job, 408 String commaSeparatedPaths 409 ) throws IOException { 410 setInputPaths(job, StringUtils.stringToPath( 411 getPathStrings(commaSeparatedPaths))); 412 } 413 414 /** 415 * Add the given comma separated paths to the list of inputs for 416 * the map-reduce job. 417 * 418 * @param job The job to modify 419 * @param commaSeparatedPaths Comma separated paths to be added to 420 * the list of inputs for the map-reduce job. 421 */ 422 public static void addInputPaths(Job job, 423 String commaSeparatedPaths 424 ) throws IOException { 425 for (String str : getPathStrings(commaSeparatedPaths)) { 426 addInputPath(job, new Path(str)); 427 } 428 } 429 430 /** 431 * Set the array of {@link Path}s as the list of inputs 432 * for the map-reduce job. 433 * 434 * @param job The job to modify 435 * @param inputPaths the {@link Path}s of the input directories/files 436 * for the map-reduce job. 437 */ 438 public static void setInputPaths(Job job, 439 Path... inputPaths) throws IOException { 440 Configuration conf = job.getConfiguration(); 441 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]); 442 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString())); 443 for(int i = 1; i < inputPaths.length;i++) { 444 str.append(StringUtils.COMMA_STR); 445 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]); 446 str.append(StringUtils.escapeString(path.toString())); 447 } 448 conf.set(INPUT_DIR, str.toString()); 449 } 450 451 /** 452 * Add a {@link Path} to the list of inputs for the map-reduce job. 453 * 454 * @param job The {@link Job} to modify 455 * @param path {@link Path} to be added to the list of inputs for 456 * the map-reduce job. 457 */ 458 public static void addInputPath(Job job, 459 Path path) throws IOException { 460 Configuration conf = job.getConfiguration(); 461 path = path.getFileSystem(conf).makeQualified(path); 462 String dirStr = StringUtils.escapeString(path.toString()); 463 String dirs = conf.get(INPUT_DIR); 464 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr); 465 } 466 467 // This method escapes commas in the glob pattern of the given paths. 468 private static String[] getPathStrings(String commaSeparatedPaths) { 469 int length = commaSeparatedPaths.length(); 470 int curlyOpen = 0; 471 int pathStart = 0; 472 boolean globPattern = false; 473 List<String> pathStrings = new ArrayList<String>(); 474 475 for (int i=0; i<length; i++) { 476 char ch = commaSeparatedPaths.charAt(i); 477 switch(ch) { 478 case '{' : { 479 curlyOpen++; 480 if (!globPattern) { 481 globPattern = true; 482 } 483 break; 484 } 485 case '}' : { 486 curlyOpen--; 487 if (curlyOpen == 0 && globPattern) { 488 globPattern = false; 489 } 490 break; 491 } 492 case ',' : { 493 if (!globPattern) { 494 pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); 495 pathStart = i + 1 ; 496 } 497 break; 498 } 499 } 500 } 501 pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); 502 503 return pathStrings.toArray(new String[0]); 504 } 505 506 /** 507 * Get the list of input {@link Path}s for the map-reduce job. 508 * 509 * @param context The job 510 * @return the list of input {@link Path}s for the map-reduce job. 511 */ 512 public static Path[] getInputPaths(JobContext context) { 513 String dirs = context.getConfiguration().get(INPUT_DIR, ""); 514 String [] list = StringUtils.split(dirs); 515 Path[] result = new Path[list.length]; 516 for (int i = 0; i < list.length; i++) { 517 result[i] = new Path(StringUtils.unEscapeString(list[i])); 518 } 519 return result; 520 } 521 522}