001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.fs.FileStatus;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.LocatedFileStatus;
033 import org.apache.hadoop.fs.Path;
034 import org.apache.hadoop.fs.PathFilter;
035 import org.apache.hadoop.fs.BlockLocation;
036 import org.apache.hadoop.fs.RemoteIterator;
037 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
038 import org.apache.hadoop.mapred.SplitLocationInfo;
039 import org.apache.hadoop.mapreduce.InputFormat;
040 import org.apache.hadoop.mapreduce.InputSplit;
041 import org.apache.hadoop.mapreduce.Job;
042 import org.apache.hadoop.mapreduce.JobContext;
043 import org.apache.hadoop.mapreduce.Mapper;
044 import org.apache.hadoop.mapreduce.security.TokenCache;
045 import org.apache.hadoop.util.ReflectionUtils;
046 import org.apache.hadoop.util.StringUtils;
047
048 import com.google.common.base.Stopwatch;
049 import com.google.common.collect.Lists;
050
051 /**
052 * A base class for file-based {@link InputFormat}s.
053 *
054 * <p><code>FileInputFormat</code> is the base class for all file-based
055 * <code>InputFormat</code>s. This provides a generic implementation of
056 * {@link #getSplits(JobContext)}.
057 * Subclasses of <code>FileInputFormat</code> can also override the
058 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
059 * not split-up and are processed as a whole by {@link Mapper}s.
060 */
061 @InterfaceAudience.Public
062 @InterfaceStability.Stable
063 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
064 public static final String INPUT_DIR =
065 "mapreduce.input.fileinputformat.inputdir";
066 public static final String SPLIT_MAXSIZE =
067 "mapreduce.input.fileinputformat.split.maxsize";
068 public static final String SPLIT_MINSIZE =
069 "mapreduce.input.fileinputformat.split.minsize";
070 public static final String PATHFILTER_CLASS =
071 "mapreduce.input.pathFilter.class";
072 public static final String NUM_INPUT_FILES =
073 "mapreduce.input.fileinputformat.numinputfiles";
074 public static final String INPUT_DIR_RECURSIVE =
075 "mapreduce.input.fileinputformat.input.dir.recursive";
076 public static final String LIST_STATUS_NUM_THREADS =
077 "mapreduce.input.fileinputformat.list-status.num-threads";
078 public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
079
080 private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
081
082 private static final double SPLIT_SLOP = 1.1; // 10% slop
083
084 @Deprecated
085 public static enum Counter {
086 BYTES_READ
087 }
088
089 private static final PathFilter hiddenFileFilter = new PathFilter(){
090 public boolean accept(Path p){
091 String name = p.getName();
092 return !name.startsWith("_") && !name.startsWith(".");
093 }
094 };
095
096 /**
097 * Proxy PathFilter that accepts a path only if all filters given in the
098 * constructor do. Used by the listPaths() to apply the built-in
099 * hiddenFileFilter together with a user provided one (if any).
100 */
101 private static class MultiPathFilter implements PathFilter {
102 private List<PathFilter> filters;
103
104 public MultiPathFilter(List<PathFilter> filters) {
105 this.filters = filters;
106 }
107
108 public boolean accept(Path path) {
109 for (PathFilter filter : filters) {
110 if (!filter.accept(path)) {
111 return false;
112 }
113 }
114 return true;
115 }
116 }
117
118 /**
119 * @param job
120 * the job to modify
121 * @param inputDirRecursive
122 */
123 public static void setInputDirRecursive(Job job,
124 boolean inputDirRecursive) {
125 job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
126 inputDirRecursive);
127 }
128
129 /**
130 * @param job
131 * the job to look at.
132 * @return should the files to be read recursively?
133 */
134 public static boolean getInputDirRecursive(JobContext job) {
135 return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
136 false);
137 }
138
139 /**
140 * Get the lower bound on split size imposed by the format.
141 * @return the number of bytes of the minimal split for this format
142 */
143 protected long getFormatMinSplitSize() {
144 return 1;
145 }
146
147 /**
148 * Is the given filename splitable? Usually, true, but if the file is
149 * stream compressed, it will not be.
150 *
151 * <code>FileInputFormat</code> implementations can override this and return
152 * <code>false</code> to ensure that individual input files are never split-up
153 * so that {@link Mapper}s process entire files.
154 *
155 * @param context the job context
156 * @param filename the file name to check
157 * @return is this file splitable?
158 */
159 protected boolean isSplitable(JobContext context, Path filename) {
160 return true;
161 }
162
163 /**
164 * Set a PathFilter to be applied to the input paths for the map-reduce job.
165 * @param job the job to modify
166 * @param filter the PathFilter class use for filtering the input paths.
167 */
168 public static void setInputPathFilter(Job job,
169 Class<? extends PathFilter> filter) {
170 job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
171 PathFilter.class);
172 }
173
174 /**
175 * Set the minimum input split size
176 * @param job the job to modify
177 * @param size the minimum size
178 */
179 public static void setMinInputSplitSize(Job job,
180 long size) {
181 job.getConfiguration().setLong(SPLIT_MINSIZE, size);
182 }
183
184 /**
185 * Get the minimum split size
186 * @param job the job
187 * @return the minimum number of bytes that can be in a split
188 */
189 public static long getMinSplitSize(JobContext job) {
190 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
191 }
192
193 /**
194 * Set the maximum split size
195 * @param job the job to modify
196 * @param size the maximum split size
197 */
198 public static void setMaxInputSplitSize(Job job,
199 long size) {
200 job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
201 }
202
203 /**
204 * Get the maximum split size.
205 * @param context the job to look at.
206 * @return the maximum number of bytes a split can include
207 */
208 public static long getMaxSplitSize(JobContext context) {
209 return context.getConfiguration().getLong(SPLIT_MAXSIZE,
210 Long.MAX_VALUE);
211 }
212
213 /**
214 * Get a PathFilter instance of the filter set for the input paths.
215 *
216 * @return the PathFilter instance set for the job, NULL if none has been set.
217 */
218 public static PathFilter getInputPathFilter(JobContext context) {
219 Configuration conf = context.getConfiguration();
220 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
221 PathFilter.class);
222 return (filterClass != null) ?
223 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
224 }
225
226 /** List input directories.
227 * Subclasses may override to, e.g., select only files matching a regular
228 * expression.
229 *
230 * @param job the job to list input paths for
231 * @return array of FileStatus objects
232 * @throws IOException if zero items.
233 */
234 protected List<FileStatus> listStatus(JobContext job
235 ) throws IOException {
236 Path[] dirs = getInputPaths(job);
237 if (dirs.length == 0) {
238 throw new IOException("No input paths specified in job");
239 }
240
241 // get tokens for all the required FileSystems..
242 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
243 job.getConfiguration());
244
245 // Whether we need to recursive look into the directory structure
246 boolean recursive = getInputDirRecursive(job);
247
248 // creates a MultiPathFilter with the hiddenFileFilter and the
249 // user provided one (if any).
250 List<PathFilter> filters = new ArrayList<PathFilter>();
251 filters.add(hiddenFileFilter);
252 PathFilter jobFilter = getInputPathFilter(job);
253 if (jobFilter != null) {
254 filters.add(jobFilter);
255 }
256 PathFilter inputFilter = new MultiPathFilter(filters);
257
258 List<FileStatus> result = null;
259
260 int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
261 DEFAULT_LIST_STATUS_NUM_THREADS);
262 Stopwatch sw = new Stopwatch().start();
263 if (numThreads == 1) {
264 result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
265 } else {
266 Iterable<FileStatus> locatedFiles = null;
267 try {
268 LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
269 job.getConfiguration(), dirs, recursive, inputFilter, true);
270 locatedFiles = locatedFileStatusFetcher.getFileStatuses();
271 } catch (InterruptedException e) {
272 throw new IOException("Interrupted while getting file statuses");
273 }
274 result = Lists.newArrayList(locatedFiles);
275 }
276
277 sw.stop();
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
280 }
281 LOG.info("Total input paths to process : " + result.size());
282 return result;
283 }
284
285 private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
286 PathFilter inputFilter, boolean recursive) throws IOException {
287 List<FileStatus> result = new ArrayList<FileStatus>();
288 List<IOException> errors = new ArrayList<IOException>();
289 for (int i=0; i < dirs.length; ++i) {
290 Path p = dirs[i];
291 FileSystem fs = p.getFileSystem(job.getConfiguration());
292 FileStatus[] matches = fs.globStatus(p, inputFilter);
293 if (matches == null) {
294 errors.add(new IOException("Input path does not exist: " + p));
295 } else if (matches.length == 0) {
296 errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
297 } else {
298 for (FileStatus globStat: matches) {
299 if (globStat.isDirectory()) {
300 RemoteIterator<LocatedFileStatus> iter =
301 fs.listLocatedStatus(globStat.getPath());
302 while (iter.hasNext()) {
303 LocatedFileStatus stat = iter.next();
304 if (inputFilter.accept(stat.getPath())) {
305 if (recursive && stat.isDirectory()) {
306 addInputPathRecursively(result, fs, stat.getPath(),
307 inputFilter);
308 } else {
309 result.add(stat);
310 }
311 }
312 }
313 } else {
314 result.add(globStat);
315 }
316 }
317 }
318 }
319
320 if (!errors.isEmpty()) {
321 throw new InvalidInputException(errors);
322 }
323 return result;
324 }
325
326 /**
327 * Add files in the input path recursively into the results.
328 * @param result
329 * The List to store all files.
330 * @param fs
331 * The FileSystem.
332 * @param path
333 * The input path.
334 * @param inputFilter
335 * The input filter that can be used to filter files/dirs.
336 * @throws IOException
337 */
338 protected void addInputPathRecursively(List<FileStatus> result,
339 FileSystem fs, Path path, PathFilter inputFilter)
340 throws IOException {
341 RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
342 while (iter.hasNext()) {
343 LocatedFileStatus stat = iter.next();
344 if (inputFilter.accept(stat.getPath())) {
345 if (stat.isDirectory()) {
346 addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
347 } else {
348 result.add(stat);
349 }
350 }
351 }
352 }
353
354
355 /**
356 * A factory that makes the split for this class. It can be overridden
357 * by sub-classes to make sub-types
358 */
359 protected FileSplit makeSplit(Path file, long start, long length,
360 String[] hosts) {
361 return new FileSplit(file, start, length, hosts);
362 }
363
364 /**
365 * A factory that makes the split for this class. It can be overridden
366 * by sub-classes to make sub-types
367 */
368 protected FileSplit makeSplit(Path file, long start, long length,
369 String[] hosts, String[] inMemoryHosts) {
370 return new FileSplit(file, start, length, hosts, inMemoryHosts);
371 }
372
373 /**
374 * Generate the list of files and make them into FileSplits.
375 * @param job the job context
376 * @throws IOException
377 */
378 public List<InputSplit> getSplits(JobContext job) throws IOException {
379 Stopwatch sw = new Stopwatch().start();
380 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
381 long maxSize = getMaxSplitSize(job);
382
383 // generate splits
384 List<InputSplit> splits = new ArrayList<InputSplit>();
385 List<FileStatus> files = listStatus(job);
386 for (FileStatus file: files) {
387 Path path = file.getPath();
388 long length = file.getLen();
389 if (length != 0) {
390 BlockLocation[] blkLocations;
391 if (file instanceof LocatedFileStatus) {
392 blkLocations = ((LocatedFileStatus) file).getBlockLocations();
393 } else {
394 FileSystem fs = path.getFileSystem(job.getConfiguration());
395 blkLocations = fs.getFileBlockLocations(file, 0, length);
396 }
397 if (isSplitable(job, path)) {
398 long blockSize = file.getBlockSize();
399 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
400
401 long bytesRemaining = length;
402 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
403 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
404 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
405 blkLocations[blkIndex].getHosts(),
406 blkLocations[blkIndex].getCachedHosts()));
407 bytesRemaining -= splitSize;
408 }
409
410 if (bytesRemaining != 0) {
411 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
412 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
413 blkLocations[blkIndex].getHosts(),
414 blkLocations[blkIndex].getCachedHosts()));
415 }
416 } else { // not splitable
417 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
418 blkLocations[0].getCachedHosts()));
419 }
420 } else {
421 //Create empty hosts array for zero length files
422 splits.add(makeSplit(path, 0, length, new String[0]));
423 }
424 }
425 // Save the number of input files for metrics/loadgen
426 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
427 sw.stop();
428 if (LOG.isDebugEnabled()) {
429 LOG.debug("Total # of splits generated by getSplits: " + splits.size()
430 + ", TimeTaken: " + sw.elapsedMillis());
431 }
432 return splits;
433 }
434
435 protected long computeSplitSize(long blockSize, long minSize,
436 long maxSize) {
437 return Math.max(minSize, Math.min(maxSize, blockSize));
438 }
439
440 protected int getBlockIndex(BlockLocation[] blkLocations,
441 long offset) {
442 for (int i = 0 ; i < blkLocations.length; i++) {
443 // is the offset inside this block?
444 if ((blkLocations[i].getOffset() <= offset) &&
445 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
446 return i;
447 }
448 }
449 BlockLocation last = blkLocations[blkLocations.length -1];
450 long fileLength = last.getOffset() + last.getLength() -1;
451 throw new IllegalArgumentException("Offset " + offset +
452 " is outside of file (0.." +
453 fileLength + ")");
454 }
455
456 /**
457 * Sets the given comma separated paths as the list of inputs
458 * for the map-reduce job.
459 *
460 * @param job the job
461 * @param commaSeparatedPaths Comma separated paths to be set as
462 * the list of inputs for the map-reduce job.
463 */
464 public static void setInputPaths(Job job,
465 String commaSeparatedPaths
466 ) throws IOException {
467 setInputPaths(job, StringUtils.stringToPath(
468 getPathStrings(commaSeparatedPaths)));
469 }
470
471 /**
472 * Add the given comma separated paths to the list of inputs for
473 * the map-reduce job.
474 *
475 * @param job The job to modify
476 * @param commaSeparatedPaths Comma separated paths to be added to
477 * the list of inputs for the map-reduce job.
478 */
479 public static void addInputPaths(Job job,
480 String commaSeparatedPaths
481 ) throws IOException {
482 for (String str : getPathStrings(commaSeparatedPaths)) {
483 addInputPath(job, new Path(str));
484 }
485 }
486
487 /**
488 * Set the array of {@link Path}s as the list of inputs
489 * for the map-reduce job.
490 *
491 * @param job The job to modify
492 * @param inputPaths the {@link Path}s of the input directories/files
493 * for the map-reduce job.
494 */
495 public static void setInputPaths(Job job,
496 Path... inputPaths) throws IOException {
497 Configuration conf = job.getConfiguration();
498 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
499 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
500 for(int i = 1; i < inputPaths.length;i++) {
501 str.append(StringUtils.COMMA_STR);
502 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
503 str.append(StringUtils.escapeString(path.toString()));
504 }
505 conf.set(INPUT_DIR, str.toString());
506 }
507
508 /**
509 * Add a {@link Path} to the list of inputs for the map-reduce job.
510 *
511 * @param job The {@link Job} to modify
512 * @param path {@link Path} to be added to the list of inputs for
513 * the map-reduce job.
514 */
515 public static void addInputPath(Job job,
516 Path path) throws IOException {
517 Configuration conf = job.getConfiguration();
518 path = path.getFileSystem(conf).makeQualified(path);
519 String dirStr = StringUtils.escapeString(path.toString());
520 String dirs = conf.get(INPUT_DIR);
521 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
522 }
523
524 // This method escapes commas in the glob pattern of the given paths.
525 private static String[] getPathStrings(String commaSeparatedPaths) {
526 int length = commaSeparatedPaths.length();
527 int curlyOpen = 0;
528 int pathStart = 0;
529 boolean globPattern = false;
530 List<String> pathStrings = new ArrayList<String>();
531
532 for (int i=0; i<length; i++) {
533 char ch = commaSeparatedPaths.charAt(i);
534 switch(ch) {
535 case '{' : {
536 curlyOpen++;
537 if (!globPattern) {
538 globPattern = true;
539 }
540 break;
541 }
542 case '}' : {
543 curlyOpen--;
544 if (curlyOpen == 0 && globPattern) {
545 globPattern = false;
546 }
547 break;
548 }
549 case ',' : {
550 if (!globPattern) {
551 pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
552 pathStart = i + 1 ;
553 }
554 break;
555 }
556 default:
557 continue; // nothing special to do for this character
558 }
559 }
560 pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
561
562 return pathStrings.toArray(new String[0]);
563 }
564
565 /**
566 * Get the list of input {@link Path}s for the map-reduce job.
567 *
568 * @param context The job
569 * @return the list of input {@link Path}s for the map-reduce job.
570 */
571 public static Path[] getInputPaths(JobContext context) {
572 String dirs = context.getConfiguration().get(INPUT_DIR, "");
573 String [] list = StringUtils.split(dirs);
574 Path[] result = new Path[list.length];
575 for (int i = 0; i < list.length; i++) {
576 result[i] = new Path(StringUtils.unEscapeString(list[i]));
577 }
578 return result;
579 }
580
581 }