001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.fs.FileStatus;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.LocatedFileStatus;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.fs.PathFilter;
035 import org.apache.hadoop.fs.BlockLocation;
036 import org.apache.hadoop.fs.RemoteIterator;
037 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
038 import org.apache.hadoop.mapreduce.InputFormat;
039 import org.apache.hadoop.mapreduce.InputSplit;
040 import org.apache.hadoop.mapreduce.Job;
041 import org.apache.hadoop.mapreduce.JobContext;
042 import org.apache.hadoop.mapreduce.Mapper;
043 import org.apache.hadoop.mapreduce.security.TokenCache;
044 import org.apache.hadoop.util.ReflectionUtils;
045 import org.apache.hadoop.util.StringUtils;
046
047 import com.google.common.base.Stopwatch;
048 import com.google.common.collect.Lists;
049
050 /**
051 * A base class for file-based {@link InputFormat}s.
052 *
053 * <p><code>FileInputFormat</code> is the base class for all file-based
054 * <code>InputFormat</code>s. This provides a generic implementation of
055 * {@link #getSplits(JobContext)}.
056 * Subclasses of <code>FileInputFormat</code> can also override the
057 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
058 * not split-up and are processed as a whole by {@link Mapper}s.
059 */
060 @InterfaceAudience.Public
061 @InterfaceStability.Stable
062 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
063 public static final String INPUT_DIR =
064 "mapreduce.input.fileinputformat.inputdir";
065 public static final String SPLIT_MAXSIZE =
066 "mapreduce.input.fileinputformat.split.maxsize";
067 public static final String SPLIT_MINSIZE =
068 "mapreduce.input.fileinputformat.split.minsize";
069 public static final String PATHFILTER_CLASS =
070 "mapreduce.input.pathFilter.class";
071 public static final String NUM_INPUT_FILES =
072 "mapreduce.input.fileinputformat.numinputfiles";
073 public static final String INPUT_DIR_RECURSIVE =
074 "mapreduce.input.fileinputformat.input.dir.recursive";
075 public static final String LIST_STATUS_NUM_THREADS =
076 "mapreduce.input.fileinputformat.list-status.num-threads";
077 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
078
079 private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
080
081 private static final double SPLIT_SLOP = 1.1; // 10% slop
082
083 @Deprecated
084 public static enum Counter {
085 BYTES_READ
086 }
087
088 private static final PathFilter hiddenFileFilter = new PathFilter(){
089 public boolean accept(Path p){
090 String name = p.getName();
091 return !name.startsWith("_") && !name.startsWith(".");
092 }
093 };
094
095 /**
096 * Proxy PathFilter that accepts a path only if all filters given in the
097 * constructor do. Used by the listPaths() to apply the built-in
098 * hiddenFileFilter together with a user provided one (if any).
099 */
100 private static class MultiPathFilter implements PathFilter {
101 private List<PathFilter> filters;
102
103 public MultiPathFilter(List<PathFilter> filters) {
104 this.filters = filters;
105 }
106
107 public boolean accept(Path path) {
108 for (PathFilter filter : filters) {
109 if (!filter.accept(path)) {
110 return false;
111 }
112 }
113 return true;
114 }
115 }
116
117 /**
118 * @param job
119 * the job to modify
120 * @param inputDirRecursive
121 */
122 public static void setInputDirRecursive(Job job,
123 boolean inputDirRecursive) {
124 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
125 inputDirRecursive);
126 }
127
128 /**
129 * @param job
130 * the job to look at.
131 * @return should the files to be read recursively?
132 */
133 public static boolean getInputDirRecursive(JobContext job) {
134 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
135 false);
136 }
137
138 /**
139 * Get the lower bound on split size imposed by the format.
140 * @return the number of bytes of the minimal split for this format
141 */
142 protected long getFormatMinSplitSize() {
143 return 1;
144 }
145
146 /**
147 * Is the given filename splitable? Usually, true, but if the file is
148 * stream compressed, it will not be.
149 *
150 * <code>FileInputFormat</code> implementations can override this and return
151 * <code>false</code> to ensure that individual input files are never split-up
152 * so that {@link Mapper}s process entire files.
153 *
154 * @param context the job context
155 * @param filename the file name to check
156 * @return is this file splitable?
157 */
158 protected boolean isSplitable(JobContext context, Path filename) {
159 return true;
160 }
161
162 /**
163 * Set a PathFilter to be applied to the input paths for the map-reduce job.
164 * @param job the job to modify
165 * @param filter the PathFilter class use for filtering the input paths.
166 */
167 public static void setInputPathFilter(Job job,
168 Class<? extends PathFilter> filter) {
169 job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
170 PathFilter.class);
171 }
172
173 /**
174 * Set the minimum input split size
175 * @param job the job to modify
176 * @param size the minimum size
177 */
178 public static void setMinInputSplitSize(Job job,
179 long size) {
180 job.getConfiguration().setLong(SPLIT_MINSIZE, size);
181 }
182
183 /**
184 * Get the minimum split size
185 * @param job the job
186 * @return the minimum number of bytes that can be in a split
187 */
188 public static long getMinSplitSize(JobContext job) {
189 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
190 }
191
192 /**
193 * Set the maximum split size
194 * @param job the job to modify
195 * @param size the maximum split size
196 */
197 public static void setMaxInputSplitSize(Job job,
198 long size) {
199 job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
200 }
201
202 /**
203 * Get the maximum split size.
204 * @param context the job to look at.
205 * @return the maximum number of bytes a split can include
206 */
207 public static long getMaxSplitSize(JobContext context) {
208 return context.getConfiguration().getLong(SPLIT_MAXSIZE,
209 Long.MAX_VALUE);
210 }
211
212 /**
213 * Get a PathFilter instance of the filter set for the input paths.
214 *
215 * @return the PathFilter instance set for the job, NULL if none has been set.
216 */
217 public static PathFilter getInputPathFilter(JobContext context) {
218 Configuration conf = context.getConfiguration();
219 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
220 PathFilter.class);
221 return (filterClass != null) ?
222 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
223 }
224
225 /** List input directories.
226 * Subclasses may override to, e.g., select only files matching a regular
227 * expression.
228 *
229 * @param job the job to list input paths for
230 * @return array of FileStatus objects
231 * @throws IOException if zero items.
232 */
233 protected List<FileStatus> listStatus(JobContext job
234 ) throws IOException {
235 Path[] dirs = getInputPaths(job);
236 if (dirs.length == 0) {
237 throw new IOException("No input paths specified in job");
238 }
239
240 // get tokens for all the required FileSystems..
241 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
242 job.getConfiguration());
243
244 // Whether we need to recursive look into the directory structure
245 boolean recursive = getInputDirRecursive(job);
246
247 // creates a MultiPathFilter with the hiddenFileFilter and the
248 // user provided one (if any).
249 List<PathFilter> filters = new ArrayList<PathFilter>();
250 filters.add(hiddenFileFilter);
251 PathFilter jobFilter = getInputPathFilter(job);
252 if (jobFilter != null) {
253 filters.add(jobFilter);
254 }
255 PathFilter inputFilter = new MultiPathFilter(filters);
256
257 List<FileStatus> result = null;
258
259 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
260 DEFAULT_LIST_STATUS_NUM_THREADS);
261 Stopwatch sw = new Stopwatch().start();
262 if (numThreads == 1) {
263 result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
264 } else {
265 Iterable<FileStatus> locatedFiles = null;
266 try {
267 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
268 job.getConfiguration(), dirs, recursive, inputFilter, true);
269 locatedFiles = locatedFileStatusFetcher.getFileStatuses();
270 } catch (InterruptedException e) {
271 throw new IOException("Interrupted while getting file statuses");
272 }
273 result = Lists.newArrayList(locatedFiles);
274 }
275
276 sw.stop();
277 if (LOG.isDebugEnabled()) {
278 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
279 }
280 LOG.info("Total input paths to process : " + result.size());
281 return result;
282 }
283
284 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
285 PathFilter inputFilter, boolean recursive) throws IOException {
286 List<FileStatus> result = new ArrayList<FileStatus>();
287 List<IOException> errors = new ArrayList<IOException>();
288 for (int i=0; i < dirs.length; ++i) {
289 Path p = dirs[i];
290 FileSystem fs = p.getFileSystem(job.getConfiguration());
291 FileStatus[] matches = fs.globStatus(p, inputFilter);
292 if (matches == null) {
293 errors.add(new IOException("Input path does not exist: " + p));
294 } else if (matches.length == 0) {
295 errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
296 } else {
297 for (FileStatus globStat: matches) {
298 if (globStat.isDirectory()) {
299 RemoteIterator<LocatedFileStatus> iter =
300 fs.listLocatedStatus(globStat.getPath());
301 while (iter.hasNext()) {
302 LocatedFileStatus stat = iter.next();
303 if (inputFilter.accept(stat.getPath())) {
304 if (recursive && stat.isDirectory()) {
305 addInputPathRecursively(result, fs, stat.getPath(),
306 inputFilter);
307 } else {
308 result.add(stat);
309 }
310 }
311 }
312 } else {
313 result.add(globStat);
314 }
315 }
316 }
317 }
318
319 if (!errors.isEmpty()) {
320 throw new InvalidInputException(errors);
321 }
322 return result;
323 }
324
325 /**
326 * Add files in the input path recursively into the results.
327 * @param result
328 * The List to store all files.
329 * @param fs
330 * The FileSystem.
331 * @param path
332 * The input path.
333 * @param inputFilter
334 * The input filter that can be used to filter files/dirs.
335 * @throws IOException
336 */
337 protected void addInputPathRecursively(List<FileStatus> result,
338 FileSystem fs, Path path, PathFilter inputFilter)
339 throws IOException {
340 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
341 while (iter.hasNext()) {
342 LocatedFileStatus stat = iter.next();
343 if (inputFilter.accept(stat.getPath())) {
344 if (stat.isDirectory()) {
345 addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
346 } else {
347 result.add(stat);
348 }
349 }
350 }
351 }
352
353
354 /**
355 * A factory that makes the split for this class. It can be overridden
356 * by sub-classes to make sub-types
357 */
358 protected FileSplit makeSplit(Path file, long start, long length,
359 String[] hosts) {
360 return new FileSplit(file, start, length, hosts);
361 }
362
363 /**
364 * Generate the list of files and make them into FileSplits.
365 * @param job the job context
366 * @throws IOException
367 */
368 public List<InputSplit> getSplits(JobContext job) throws IOException {
369 Stopwatch sw = new Stopwatch().start();
370 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
371 long maxSize = getMaxSplitSize(job);
372
373 // generate splits
374 List<InputSplit> splits = new ArrayList<InputSplit>();
375 List<FileStatus> files = listStatus(job);
376 for (FileStatus file: files) {
377 Path path = file.getPath();
378 long length = file.getLen();
379 if (length != 0) {
380 BlockLocation[] blkLocations;
381 if (file instanceof LocatedFileStatus) {
382 blkLocations = ((LocatedFileStatus) file).getBlockLocations();
383 } else {
384 FileSystem fs = path.getFileSystem(job.getConfiguration());
385 blkLocations = fs.getFileBlockLocations(file, 0, length);
386 }
387 if (isSplitable(job, path)) {
388 long blockSize = file.getBlockSize();
389 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
390
391 long bytesRemaining = length;
392 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
393 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
394 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
395 blkLocations[blkIndex].getHosts()));
396 bytesRemaining -= splitSize;
397 }
398
399 if (bytesRemaining != 0) {
400 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
401 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
402 blkLocations[blkIndex].getHosts()));
403 }
404 } else { // not splitable
405 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
406 }
407 } else {
408 //Create empty hosts array for zero length files
409 splits.add(makeSplit(path, 0, length, new String[0]));
410 }
411 }
412 // Save the number of input files for metrics/loadgen
413 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
414 sw.stop();
415 if (LOG.isDebugEnabled()) {
416 LOG.debug("Total # of splits generated by getSplits: " + splits.size()
417 + ", TimeTaken: " + sw.elapsedMillis());
418 }
419 return splits;
420 }
421
422 protected long computeSplitSize(long blockSize, long minSize,
423 long maxSize) {
424 return Math.max(minSize, Math.min(maxSize, blockSize));
425 }
426
427 protected int getBlockIndex(BlockLocation[] blkLocations,
428 long offset) {
429 for (int i = 0 ; i < blkLocations.length; i++) {
430 // is the offset inside this block?
431 if ((blkLocations[i].getOffset() <= offset) &&
432 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
433 return i;
434 }
435 }
436 BlockLocation last = blkLocations[blkLocations.length -1];
437 long fileLength = last.getOffset() + last.getLength() -1;
438 throw new IllegalArgumentException("Offset " + offset +
439 " is outside of file (0.." +
440 fileLength + ")");
441 }
442
443 /**
444 * Sets the given comma separated paths as the list of inputs
445 * for the map-reduce job.
446 *
447 * @param job the job
448 * @param commaSeparatedPaths Comma separated paths to be set as
449 * the list of inputs for the map-reduce job.
450 */
451 public static void setInputPaths(Job job,
452 String commaSeparatedPaths
453 ) throws IOException {
454 setInputPaths(job, StringUtils.stringToPath(
455 getPathStrings(commaSeparatedPaths)));
456 }
457
458 /**
459 * Add the given comma separated paths to the list of inputs for
460 * the map-reduce job.
461 *
462 * @param job The job to modify
463 * @param commaSeparatedPaths Comma separated paths to be added to
464 * the list of inputs for the map-reduce job.
465 */
466 public static void addInputPaths(Job job,
467 String commaSeparatedPaths
468 ) throws IOException {
469 for (String str : getPathStrings(commaSeparatedPaths)) {
470 addInputPath(job, new Path(str));
471 }
472 }
473
474 /**
475 * Set the array of {@link Path}s as the list of inputs
476 * for the map-reduce job.
477 *
478 * @param job The job to modify
479 * @param inputPaths the {@link Path}s of the input directories/files
480 * for the map-reduce job.
481 */
482 public static void setInputPaths(Job job,
483 Path... inputPaths) throws IOException {
484 Configuration conf = job.getConfiguration();
485 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
486 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
487 for(int i = 1; i < inputPaths.length;i++) {
488 str.append(StringUtils.COMMA_STR);
489 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
490 str.append(StringUtils.escapeString(path.toString()));
491 }
492 conf.set(INPUT_DIR, str.toString());
493 }
494
495 /**
496 * Add a {@link Path} to the list of inputs for the map-reduce job.
497 *
498 * @param job The {@link Job} to modify
499 * @param path {@link Path} to be added to the list of inputs for
500 * the map-reduce job.
501 */
502 public static void addInputPath(Job job,
503 Path path) throws IOException {
504 Configuration conf = job.getConfiguration();
505 path = path.getFileSystem(conf).makeQualified(path);
506 String dirStr = StringUtils.escapeString(path.toString());
507 String dirs = conf.get(INPUT_DIR);
508 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
509 }
510
511 // This method escapes commas in the glob pattern of the given paths.
512 private static String[] getPathStrings(String commaSeparatedPaths) {
513 int length = commaSeparatedPaths.length();
514 int curlyOpen = 0;
515 int pathStart = 0;
516 boolean globPattern = false;
517 List<String> pathStrings = new ArrayList<String>();
518
519 for (int i=0; i<length; i++) {
520 char ch = commaSeparatedPaths.charAt(i);
521 switch(ch) {
522 case '{' : {
523 curlyOpen++;
524 if (!globPattern) {
525 globPattern = true;
526 }
527 break;
528 }
529 case '}' : {
530 curlyOpen--;
531 if (curlyOpen == 0 && globPattern) {
532 globPattern = false;
533 }
534 break;
535 }
536 case ',' : {
537 if (!globPattern) {
538 pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
539 pathStart = i + 1 ;
540 }
541 break;
542 }
543 default:
544 continue; // nothing special to do for this character
545 }
546 }
547 pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
548
549 return pathStrings.toArray(new String[0]);
550 }
551
552 /**
553 * Get the list of input {@link Path}s for the map-reduce job.
554 *
555 * @param context The job
556 * @return the list of input {@link Path}s for the map-reduce job.
557 */
558 public static Path[] getInputPaths(JobContext context) {
559 String dirs = context.getConfiguration().get(INPUT_DIR, "");
560 String [] list = StringUtils.split(dirs);
561 Path[] result = new Path[list.length];
562 for (int i = 0; i < list.length; i++) {
563 result[i] = new Path(StringUtils.unEscapeString(list[i]));
564 }
565 return result;
566 }
567
568 }