001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PathFilter;
035import org.apache.hadoop.fs.BlockLocation;
036import org.apache.hadoop.fs.RemoteIterator;
037import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
038import org.apache.hadoop.mapred.SplitLocationInfo;
039import org.apache.hadoop.mapreduce.InputFormat;
040import org.apache.hadoop.mapreduce.InputSplit;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.mapreduce.JobContext;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.security.TokenCache;
045import org.apache.hadoop.util.ReflectionUtils;
046import org.apache.hadoop.util.StringUtils;
047
048import com.google.common.base.Stopwatch;
049import com.google.common.collect.Lists;
050
051/** 
052 * A base class for file-based {@link InputFormat}s.
053 * 
054 * <p><code>FileInputFormat</code> is the base class for all file-based 
055 * <code>InputFormat</code>s. This provides a generic implementation of
056 * {@link #getSplits(JobContext)}.
057 * Subclasses of <code>FileInputFormat</code> can also override the 
058 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
059 * not split-up and are processed as a whole by {@link Mapper}s.
060 */
061@InterfaceAudience.Public
062@InterfaceStability.Stable
063public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
064  public static final String INPUT_DIR = 
065    "mapreduce.input.fileinputformat.inputdir";
066  public static final String SPLIT_MAXSIZE = 
067    "mapreduce.input.fileinputformat.split.maxsize";
068  public static final String SPLIT_MINSIZE = 
069    "mapreduce.input.fileinputformat.split.minsize";
070  public static final String PATHFILTER_CLASS = 
071    "mapreduce.input.pathFilter.class";
072  public static final String NUM_INPUT_FILES =
073    "mapreduce.input.fileinputformat.numinputfiles";
074  public static final String INPUT_DIR_RECURSIVE =
075    "mapreduce.input.fileinputformat.input.dir.recursive";
076  public static final String LIST_STATUS_NUM_THREADS =
077      "mapreduce.input.fileinputformat.list-status.num-threads";
078  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
079
080  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
081
082  private static final double SPLIT_SLOP = 1.1;   // 10% slop
083  
084  @Deprecated
085  public static enum Counter { 
086    BYTES_READ
087  }
088
089  private static final PathFilter hiddenFileFilter = new PathFilter(){
090      public boolean accept(Path p){
091        String name = p.getName(); 
092        return !name.startsWith("_") && !name.startsWith("."); 
093      }
094    }; 
095
096  /**
097   * Proxy PathFilter that accepts a path only if all filters given in the
098   * constructor do. Used by the listPaths() to apply the built-in
099   * hiddenFileFilter together with a user provided one (if any).
100   */
101  private static class MultiPathFilter implements PathFilter {
102    private List<PathFilter> filters;
103
104    public MultiPathFilter(List<PathFilter> filters) {
105      this.filters = filters;
106    }
107
108    public boolean accept(Path path) {
109      for (PathFilter filter : filters) {
110        if (!filter.accept(path)) {
111          return false;
112        }
113      }
114      return true;
115    }
116  }
117  
118  /**
119   * @param job
120   *          the job to modify
121   * @param inputDirRecursive
122   */
123  public static void setInputDirRecursive(Job job,
124      boolean inputDirRecursive) {
125    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
126        inputDirRecursive);
127  }
128 
129  /**
130   * @param job
131   *          the job to look at.
132   * @return should the files to be read recursively?
133   */
134  public static boolean getInputDirRecursive(JobContext job) {
135    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
136        false);
137  }
138
139  /**
140   * Get the lower bound on split size imposed by the format.
141   * @return the number of bytes of the minimal split for this format
142   */
143  protected long getFormatMinSplitSize() {
144    return 1;
145  }
146
147  /**
148   * Is the given filename splitable? Usually, true, but if the file is
149   * stream compressed, it will not be.
150   * 
151   * <code>FileInputFormat</code> implementations can override this and return
152   * <code>false</code> to ensure that individual input files are never split-up
153   * so that {@link Mapper}s process entire files.
154   * 
155   * @param context the job context
156   * @param filename the file name to check
157   * @return is this file splitable?
158   */
159  protected boolean isSplitable(JobContext context, Path filename) {
160    return true;
161  }
162
163  /**
164   * Set a PathFilter to be applied to the input paths for the map-reduce job.
165   * @param job the job to modify
166   * @param filter the PathFilter class use for filtering the input paths.
167   */
168  public static void setInputPathFilter(Job job,
169                                        Class<? extends PathFilter> filter) {
170    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
171                                    PathFilter.class);
172  }
173
174  /**
175   * Set the minimum input split size
176   * @param job the job to modify
177   * @param size the minimum size
178   */
179  public static void setMinInputSplitSize(Job job,
180                                          long size) {
181    job.getConfiguration().setLong(SPLIT_MINSIZE, size);
182  }
183
184  /**
185   * Get the minimum split size
186   * @param job the job
187   * @return the minimum number of bytes that can be in a split
188   */
189  public static long getMinSplitSize(JobContext job) {
190    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
191  }
192
193  /**
194   * Set the maximum split size
195   * @param job the job to modify
196   * @param size the maximum split size
197   */
198  public static void setMaxInputSplitSize(Job job,
199                                          long size) {
200    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
201  }
202
203  /**
204   * Get the maximum split size.
205   * @param context the job to look at.
206   * @return the maximum number of bytes a split can include
207   */
208  public static long getMaxSplitSize(JobContext context) {
209    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
210                                              Long.MAX_VALUE);
211  }
212
213  /**
214   * Get a PathFilter instance of the filter set for the input paths.
215   *
216   * @return the PathFilter instance set for the job, NULL if none has been set.
217   */
218  public static PathFilter getInputPathFilter(JobContext context) {
219    Configuration conf = context.getConfiguration();
220    Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
221        PathFilter.class);
222    return (filterClass != null) ?
223        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
224  }
225
226  /** List input directories.
227   * Subclasses may override to, e.g., select only files matching a regular
228   * expression. 
229   * 
230   * @param job the job to list input paths for
231   * @return array of FileStatus objects
232   * @throws IOException if zero items.
233   */
234  protected List<FileStatus> listStatus(JobContext job
235                                        ) throws IOException {
236    Path[] dirs = getInputPaths(job);
237    if (dirs.length == 0) {
238      throw new IOException("No input paths specified in job");
239    }
240    
241    // get tokens for all the required FileSystems..
242    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
243                                        job.getConfiguration());
244
245    // Whether we need to recursive look into the directory structure
246    boolean recursive = getInputDirRecursive(job);
247
248    // creates a MultiPathFilter with the hiddenFileFilter and the
249    // user provided one (if any).
250    List<PathFilter> filters = new ArrayList<PathFilter>();
251    filters.add(hiddenFileFilter);
252    PathFilter jobFilter = getInputPathFilter(job);
253    if (jobFilter != null) {
254      filters.add(jobFilter);
255    }
256    PathFilter inputFilter = new MultiPathFilter(filters);
257    
258    List<FileStatus> result = null;
259
260    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
261        DEFAULT_LIST_STATUS_NUM_THREADS);
262    Stopwatch sw = new Stopwatch().start();
263    if (numThreads == 1) {
264      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
265    } else {
266      Iterable<FileStatus> locatedFiles = null;
267      try {
268        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
269            job.getConfiguration(), dirs, recursive, inputFilter, true);
270        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
271      } catch (InterruptedException e) {
272        throw new IOException("Interrupted while getting file statuses");
273      }
274      result = Lists.newArrayList(locatedFiles);
275    }
276    
277    sw.stop();
278    if (LOG.isDebugEnabled()) {
279      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
280    }
281    LOG.info("Total input paths to process : " + result.size()); 
282    return result;
283  }
284
285  private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
286      PathFilter inputFilter, boolean recursive) throws IOException {
287    List<FileStatus> result = new ArrayList<FileStatus>();
288    List<IOException> errors = new ArrayList<IOException>();
289    for (int i=0; i < dirs.length; ++i) {
290      Path p = dirs[i];
291      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
292      FileStatus[] matches = fs.globStatus(p, inputFilter);
293      if (matches == null) {
294        errors.add(new IOException("Input path does not exist: " + p));
295      } else if (matches.length == 0) {
296        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
297      } else {
298        for (FileStatus globStat: matches) {
299          if (globStat.isDirectory()) {
300            RemoteIterator<LocatedFileStatus> iter =
301                fs.listLocatedStatus(globStat.getPath());
302            while (iter.hasNext()) {
303              LocatedFileStatus stat = iter.next();
304              if (inputFilter.accept(stat.getPath())) {
305                if (recursive && stat.isDirectory()) {
306                  addInputPathRecursively(result, fs, stat.getPath(),
307                      inputFilter);
308                } else {
309                  result.add(stat);
310                }
311              }
312            }
313          } else {
314            result.add(globStat);
315          }
316        }
317      }
318    }
319
320    if (!errors.isEmpty()) {
321      throw new InvalidInputException(errors);
322    }
323    return result;
324  }
325  
326  /**
327   * Add files in the input path recursively into the results.
328   * @param result
329   *          The List to store all files.
330   * @param fs
331   *          The FileSystem.
332   * @param path
333   *          The input path.
334   * @param inputFilter
335   *          The input filter that can be used to filter files/dirs. 
336   * @throws IOException
337   */
338  protected void addInputPathRecursively(List<FileStatus> result,
339      FileSystem fs, Path path, PathFilter inputFilter) 
340      throws IOException {
341    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
342    while (iter.hasNext()) {
343      LocatedFileStatus stat = iter.next();
344      if (inputFilter.accept(stat.getPath())) {
345        if (stat.isDirectory()) {
346          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
347        } else {
348          result.add(stat);
349        }
350      }
351    }
352  }
353  
354  
355  /**
356   * A factory that makes the split for this class. It can be overridden
357   * by sub-classes to make sub-types
358   */
359  protected FileSplit makeSplit(Path file, long start, long length, 
360                                String[] hosts) {
361    return new FileSplit(file, start, length, hosts);
362  }
363  
364  /**
365   * A factory that makes the split for this class. It can be overridden
366   * by sub-classes to make sub-types
367   */
368  protected FileSplit makeSplit(Path file, long start, long length, 
369                                String[] hosts, String[] inMemoryHosts) {
370    return new FileSplit(file, start, length, hosts, inMemoryHosts);
371  }
372
373  /** 
374   * Generate the list of files and make them into FileSplits.
375   * @param job the job context
376   * @throws IOException
377   */
378  public List<InputSplit> getSplits(JobContext job) throws IOException {
379    Stopwatch sw = new Stopwatch().start();
380    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
381    long maxSize = getMaxSplitSize(job);
382
383    // generate splits
384    List<InputSplit> splits = new ArrayList<InputSplit>();
385    List<FileStatus> files = listStatus(job);
386    for (FileStatus file: files) {
387      Path path = file.getPath();
388      long length = file.getLen();
389      if (length != 0) {
390        BlockLocation[] blkLocations;
391        if (file instanceof LocatedFileStatus) {
392          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
393        } else {
394          FileSystem fs = path.getFileSystem(job.getConfiguration());
395          blkLocations = fs.getFileBlockLocations(file, 0, length);
396        }
397        if (isSplitable(job, path)) {
398          long blockSize = file.getBlockSize();
399          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
400
401          long bytesRemaining = length;
402          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
403            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
404            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
405                        blkLocations[blkIndex].getHosts(),
406                        blkLocations[blkIndex].getCachedHosts()));
407            bytesRemaining -= splitSize;
408          }
409
410          if (bytesRemaining != 0) {
411            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
412            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
413                       blkLocations[blkIndex].getHosts(),
414                       blkLocations[blkIndex].getCachedHosts()));
415          }
416        } else { // not splitable
417          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
418                      blkLocations[0].getCachedHosts()));
419        }
420      } else { 
421        //Create empty hosts array for zero length files
422        splits.add(makeSplit(path, 0, length, new String[0]));
423      }
424    }
425    // Save the number of input files for metrics/loadgen
426    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
427    sw.stop();
428    if (LOG.isDebugEnabled()) {
429      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
430          + ", TimeTaken: " + sw.elapsedMillis());
431    }
432    return splits;
433  }
434
435  protected long computeSplitSize(long blockSize, long minSize,
436                                  long maxSize) {
437    return Math.max(minSize, Math.min(maxSize, blockSize));
438  }
439
440  protected int getBlockIndex(BlockLocation[] blkLocations, 
441                              long offset) {
442    for (int i = 0 ; i < blkLocations.length; i++) {
443      // is the offset inside this block?
444      if ((blkLocations[i].getOffset() <= offset) &&
445          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
446        return i;
447      }
448    }
449    BlockLocation last = blkLocations[blkLocations.length -1];
450    long fileLength = last.getOffset() + last.getLength() -1;
451    throw new IllegalArgumentException("Offset " + offset + 
452                                       " is outside of file (0.." +
453                                       fileLength + ")");
454  }
455
456  /**
457   * Sets the given comma separated paths as the list of inputs 
458   * for the map-reduce job.
459   * 
460   * @param job the job
461   * @param commaSeparatedPaths Comma separated paths to be set as 
462   *        the list of inputs for the map-reduce job.
463   */
464  public static void setInputPaths(Job job, 
465                                   String commaSeparatedPaths
466                                   ) throws IOException {
467    setInputPaths(job, StringUtils.stringToPath(
468                        getPathStrings(commaSeparatedPaths)));
469  }
470
471  /**
472   * Add the given comma separated paths to the list of inputs for
473   *  the map-reduce job.
474   * 
475   * @param job The job to modify
476   * @param commaSeparatedPaths Comma separated paths to be added to
477   *        the list of inputs for the map-reduce job.
478   */
479  public static void addInputPaths(Job job, 
480                                   String commaSeparatedPaths
481                                   ) throws IOException {
482    for (String str : getPathStrings(commaSeparatedPaths)) {
483      addInputPath(job, new Path(str));
484    }
485  }
486
487  /**
488   * Set the array of {@link Path}s as the list of inputs
489   * for the map-reduce job.
490   * 
491   * @param job The job to modify 
492   * @param inputPaths the {@link Path}s of the input directories/files 
493   * for the map-reduce job.
494   */ 
495  public static void setInputPaths(Job job, 
496                                   Path... inputPaths) throws IOException {
497    Configuration conf = job.getConfiguration();
498    Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
499    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
500    for(int i = 1; i < inputPaths.length;i++) {
501      str.append(StringUtils.COMMA_STR);
502      path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
503      str.append(StringUtils.escapeString(path.toString()));
504    }
505    conf.set(INPUT_DIR, str.toString());
506  }
507
508  /**
509   * Add a {@link Path} to the list of inputs for the map-reduce job.
510   * 
511   * @param job The {@link Job} to modify
512   * @param path {@link Path} to be added to the list of inputs for 
513   *            the map-reduce job.
514   */
515  public static void addInputPath(Job job, 
516                                  Path path) throws IOException {
517    Configuration conf = job.getConfiguration();
518    path = path.getFileSystem(conf).makeQualified(path);
519    String dirStr = StringUtils.escapeString(path.toString());
520    String dirs = conf.get(INPUT_DIR);
521    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
522  }
523  
524  // This method escapes commas in the glob pattern of the given paths.
525  private static String[] getPathStrings(String commaSeparatedPaths) {
526    int length = commaSeparatedPaths.length();
527    int curlyOpen = 0;
528    int pathStart = 0;
529    boolean globPattern = false;
530    List<String> pathStrings = new ArrayList<String>();
531    
532    for (int i=0; i<length; i++) {
533      char ch = commaSeparatedPaths.charAt(i);
534      switch(ch) {
535        case '{' : {
536          curlyOpen++;
537          if (!globPattern) {
538            globPattern = true;
539          }
540          break;
541        }
542        case '}' : {
543          curlyOpen--;
544          if (curlyOpen == 0 && globPattern) {
545            globPattern = false;
546          }
547          break;
548        }
549        case ',' : {
550          if (!globPattern) {
551            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
552            pathStart = i + 1 ;
553          }
554          break;
555        }
556        default:
557          continue; // nothing special to do for this character
558      }
559    }
560    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
561    
562    return pathStrings.toArray(new String[0]);
563  }
564  
565  /**
566   * Get the list of input {@link Path}s for the map-reduce job.
567   * 
568   * @param context The job
569   * @return the list of input {@link Path}s for the map-reduce job.
570   */
571  public static Path[] getInputPaths(JobContext context) {
572    String dirs = context.getConfiguration().get(INPUT_DIR, "");
573    String [] list = StringUtils.split(dirs);
574    Path[] result = new Path[list.length];
575    for (int i = 0; i < list.length; i++) {
576      result[i] = new Path(StringUtils.unEscapeString(list[i]));
577    }
578    return result;
579  }
580
581}