001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import java.util.concurrent.TimeUnit;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileStatus;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.LocatedFileStatus;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.PathFilter;
036import org.apache.hadoop.fs.BlockLocation;
037import org.apache.hadoop.fs.RemoteIterator;
038import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
039import org.apache.hadoop.mapred.SplitLocationInfo;
040import org.apache.hadoop.mapreduce.InputFormat;
041import org.apache.hadoop.mapreduce.InputSplit;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.JobContext;
044import org.apache.hadoop.mapreduce.Mapper;
045import org.apache.hadoop.mapreduce.security.TokenCache;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.util.StopWatch;
048import org.apache.hadoop.util.StringUtils;
049
050import com.google.common.collect.Lists;
051
052/** 
053 * A base class for file-based {@link InputFormat}s.
054 * 
055 * <p><code>FileInputFormat</code> is the base class for all file-based 
056 * <code>InputFormat</code>s. This provides a generic implementation of
057 * {@link #getSplits(JobContext)}.
058 * Subclasses of <code>FileInputFormat</code> can also override the 
059 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
060 * not split-up and are processed as a whole by {@link Mapper}s.
061 */
062@InterfaceAudience.Public
063@InterfaceStability.Stable
064public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
065  public static final String INPUT_DIR = 
066    "mapreduce.input.fileinputformat.inputdir";
067  public static final String SPLIT_MAXSIZE = 
068    "mapreduce.input.fileinputformat.split.maxsize";
069  public static final String SPLIT_MINSIZE = 
070    "mapreduce.input.fileinputformat.split.minsize";
071  public static final String PATHFILTER_CLASS = 
072    "mapreduce.input.pathFilter.class";
073  public static final String NUM_INPUT_FILES =
074    "mapreduce.input.fileinputformat.numinputfiles";
075  public static final String INPUT_DIR_RECURSIVE =
076    "mapreduce.input.fileinputformat.input.dir.recursive";
077  public static final String LIST_STATUS_NUM_THREADS =
078      "mapreduce.input.fileinputformat.list-status.num-threads";
079  public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
080
081  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
082
083  private static final double SPLIT_SLOP = 1.1;   // 10% slop
084  
085  @Deprecated
086  public static enum Counter { 
087    BYTES_READ
088  }
089
090  private static final PathFilter hiddenFileFilter = new PathFilter(){
091      public boolean accept(Path p){
092        String name = p.getName(); 
093        return !name.startsWith("_") && !name.startsWith("."); 
094      }
095    }; 
096
097  /**
098   * Proxy PathFilter that accepts a path only if all filters given in the
099   * constructor do. Used by the listPaths() to apply the built-in
100   * hiddenFileFilter together with a user provided one (if any).
101   */
102  private static class MultiPathFilter implements PathFilter {
103    private List<PathFilter> filters;
104
105    public MultiPathFilter(List<PathFilter> filters) {
106      this.filters = filters;
107    }
108
109    public boolean accept(Path path) {
110      for (PathFilter filter : filters) {
111        if (!filter.accept(path)) {
112          return false;
113        }
114      }
115      return true;
116    }
117  }
118  
119  /**
120   * @param job
121   *          the job to modify
122   * @param inputDirRecursive
123   */
124  public static void setInputDirRecursive(Job job,
125      boolean inputDirRecursive) {
126    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
127        inputDirRecursive);
128  }
129 
130  /**
131   * @param job
132   *          the job to look at.
133   * @return should the files to be read recursively?
134   */
135  public static boolean getInputDirRecursive(JobContext job) {
136    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
137        false);
138  }
139
140  /**
141   * Get the lower bound on split size imposed by the format.
142   * @return the number of bytes of the minimal split for this format
143   */
144  protected long getFormatMinSplitSize() {
145    return 1;
146  }
147
148  /**
149   * Is the given filename splitable? Usually, true, but if the file is
150   * stream compressed, it will not be.
151   * 
152   * <code>FileInputFormat</code> implementations can override this and return
153   * <code>false</code> to ensure that individual input files are never split-up
154   * so that {@link Mapper}s process entire files.
155   * 
156   * @param context the job context
157   * @param filename the file name to check
158   * @return is this file splitable?
159   */
160  protected boolean isSplitable(JobContext context, Path filename) {
161    return true;
162  }
163
164  /**
165   * Set a PathFilter to be applied to the input paths for the map-reduce job.
166   * @param job the job to modify
167   * @param filter the PathFilter class use for filtering the input paths.
168   */
169  public static void setInputPathFilter(Job job,
170                                        Class<? extends PathFilter> filter) {
171    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
172                                    PathFilter.class);
173  }
174
175  /**
176   * Set the minimum input split size
177   * @param job the job to modify
178   * @param size the minimum size
179   */
180  public static void setMinInputSplitSize(Job job,
181                                          long size) {
182    job.getConfiguration().setLong(SPLIT_MINSIZE, size);
183  }
184
185  /**
186   * Get the minimum split size
187   * @param job the job
188   * @return the minimum number of bytes that can be in a split
189   */
190  public static long getMinSplitSize(JobContext job) {
191    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
192  }
193
194  /**
195   * Set the maximum split size
196   * @param job the job to modify
197   * @param size the maximum split size
198   */
199  public static void setMaxInputSplitSize(Job job,
200                                          long size) {
201    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
202  }
203
204  /**
205   * Get the maximum split size.
206   * @param context the job to look at.
207   * @return the maximum number of bytes a split can include
208   */
209  public static long getMaxSplitSize(JobContext context) {
210    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
211                                              Long.MAX_VALUE);
212  }
213
214  /**
215   * Get a PathFilter instance of the filter set for the input paths.
216   *
217   * @return the PathFilter instance set for the job, NULL if none has been set.
218   */
219  public static PathFilter getInputPathFilter(JobContext context) {
220    Configuration conf = context.getConfiguration();
221    Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
222        PathFilter.class);
223    return (filterClass != null) ?
224        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
225  }
226
227  /** List input directories.
228   * Subclasses may override to, e.g., select only files matching a regular
229   * expression. 
230   * 
231   * @param job the job to list input paths for
232   * @return array of FileStatus objects
233   * @throws IOException if zero items.
234   */
235  protected List<FileStatus> listStatus(JobContext job
236                                        ) throws IOException {
237    Path[] dirs = getInputPaths(job);
238    if (dirs.length == 0) {
239      throw new IOException("No input paths specified in job");
240    }
241    
242    // get tokens for all the required FileSystems..
243    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
244                                        job.getConfiguration());
245
246    // Whether we need to recursive look into the directory structure
247    boolean recursive = getInputDirRecursive(job);
248
249    // creates a MultiPathFilter with the hiddenFileFilter and the
250    // user provided one (if any).
251    List<PathFilter> filters = new ArrayList<PathFilter>();
252    filters.add(hiddenFileFilter);
253    PathFilter jobFilter = getInputPathFilter(job);
254    if (jobFilter != null) {
255      filters.add(jobFilter);
256    }
257    PathFilter inputFilter = new MultiPathFilter(filters);
258    
259    List<FileStatus> result = null;
260
261    int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
262        DEFAULT_LIST_STATUS_NUM_THREADS);
263    StopWatch sw = new StopWatch().start();
264    if (numThreads == 1) {
265      result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
266    } else {
267      Iterable<FileStatus> locatedFiles = null;
268      try {
269        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
270            job.getConfiguration(), dirs, recursive, inputFilter, true);
271        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
272      } catch (InterruptedException e) {
273        throw new IOException("Interrupted while getting file statuses");
274      }
275      result = Lists.newArrayList(locatedFiles);
276    }
277    
278    sw.stop();
279    if (LOG.isDebugEnabled()) {
280      LOG.debug("Time taken to get FileStatuses: "
281          + sw.now(TimeUnit.MILLISECONDS));
282    }
283    LOG.info("Total input paths to process : " + result.size()); 
284    return result;
285  }
286
287  private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
288      PathFilter inputFilter, boolean recursive) throws IOException {
289    List<FileStatus> result = new ArrayList<FileStatus>();
290    List<IOException> errors = new ArrayList<IOException>();
291    for (int i=0; i < dirs.length; ++i) {
292      Path p = dirs[i];
293      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
294      FileStatus[] matches = fs.globStatus(p, inputFilter);
295      if (matches == null) {
296        errors.add(new IOException("Input path does not exist: " + p));
297      } else if (matches.length == 0) {
298        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
299      } else {
300        for (FileStatus globStat: matches) {
301          if (globStat.isDirectory()) {
302            RemoteIterator<LocatedFileStatus> iter =
303                fs.listLocatedStatus(globStat.getPath());
304            while (iter.hasNext()) {
305              LocatedFileStatus stat = iter.next();
306              if (inputFilter.accept(stat.getPath())) {
307                if (recursive && stat.isDirectory()) {
308                  addInputPathRecursively(result, fs, stat.getPath(),
309                      inputFilter);
310                } else {
311                  result.add(stat);
312                }
313              }
314            }
315          } else {
316            result.add(globStat);
317          }
318        }
319      }
320    }
321
322    if (!errors.isEmpty()) {
323      throw new InvalidInputException(errors);
324    }
325    return result;
326  }
327  
328  /**
329   * Add files in the input path recursively into the results.
330   * @param result
331   *          The List to store all files.
332   * @param fs
333   *          The FileSystem.
334   * @param path
335   *          The input path.
336   * @param inputFilter
337   *          The input filter that can be used to filter files/dirs. 
338   * @throws IOException
339   */
340  protected void addInputPathRecursively(List<FileStatus> result,
341      FileSystem fs, Path path, PathFilter inputFilter) 
342      throws IOException {
343    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
344    while (iter.hasNext()) {
345      LocatedFileStatus stat = iter.next();
346      if (inputFilter.accept(stat.getPath())) {
347        if (stat.isDirectory()) {
348          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
349        } else {
350          result.add(stat);
351        }
352      }
353    }
354  }
355  
356  
357  /**
358   * A factory that makes the split for this class. It can be overridden
359   * by sub-classes to make sub-types
360   */
361  protected FileSplit makeSplit(Path file, long start, long length, 
362                                String[] hosts) {
363    return new FileSplit(file, start, length, hosts);
364  }
365  
366  /**
367   * A factory that makes the split for this class. It can be overridden
368   * by sub-classes to make sub-types
369   */
370  protected FileSplit makeSplit(Path file, long start, long length, 
371                                String[] hosts, String[] inMemoryHosts) {
372    return new FileSplit(file, start, length, hosts, inMemoryHosts);
373  }
374
375  /** 
376   * Generate the list of files and make them into FileSplits.
377   * @param job the job context
378   * @throws IOException
379   */
380  public List<InputSplit> getSplits(JobContext job) throws IOException {
381    StopWatch sw = new StopWatch().start();
382    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
383    long maxSize = getMaxSplitSize(job);
384
385    // generate splits
386    List<InputSplit> splits = new ArrayList<InputSplit>();
387    List<FileStatus> files = listStatus(job);
388    for (FileStatus file: files) {
389      Path path = file.getPath();
390      long length = file.getLen();
391      if (length != 0) {
392        BlockLocation[] blkLocations;
393        if (file instanceof LocatedFileStatus) {
394          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
395        } else {
396          FileSystem fs = path.getFileSystem(job.getConfiguration());
397          blkLocations = fs.getFileBlockLocations(file, 0, length);
398        }
399        if (isSplitable(job, path)) {
400          long blockSize = file.getBlockSize();
401          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
402
403          long bytesRemaining = length;
404          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
405            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
406            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
407                        blkLocations[blkIndex].getHosts(),
408                        blkLocations[blkIndex].getCachedHosts()));
409            bytesRemaining -= splitSize;
410          }
411
412          if (bytesRemaining != 0) {
413            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
414            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
415                       blkLocations[blkIndex].getHosts(),
416                       blkLocations[blkIndex].getCachedHosts()));
417          }
418        } else { // not splitable
419          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
420                      blkLocations[0].getCachedHosts()));
421        }
422      } else { 
423        //Create empty hosts array for zero length files
424        splits.add(makeSplit(path, 0, length, new String[0]));
425      }
426    }
427    // Save the number of input files for metrics/loadgen
428    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
429    sw.stop();
430    if (LOG.isDebugEnabled()) {
431      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
432          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
433    }
434    return splits;
435  }
436
437  protected long computeSplitSize(long blockSize, long minSize,
438                                  long maxSize) {
439    return Math.max(minSize, Math.min(maxSize, blockSize));
440  }
441
442  protected int getBlockIndex(BlockLocation[] blkLocations, 
443                              long offset) {
444    for (int i = 0 ; i < blkLocations.length; i++) {
445      // is the offset inside this block?
446      if ((blkLocations[i].getOffset() <= offset) &&
447          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
448        return i;
449      }
450    }
451    BlockLocation last = blkLocations[blkLocations.length -1];
452    long fileLength = last.getOffset() + last.getLength() -1;
453    throw new IllegalArgumentException("Offset " + offset + 
454                                       " is outside of file (0.." +
455                                       fileLength + ")");
456  }
457
458  /**
459   * Sets the given comma separated paths as the list of inputs 
460   * for the map-reduce job.
461   * 
462   * @param job the job
463   * @param commaSeparatedPaths Comma separated paths to be set as 
464   *        the list of inputs for the map-reduce job.
465   */
466  public static void setInputPaths(Job job, 
467                                   String commaSeparatedPaths
468                                   ) throws IOException {
469    setInputPaths(job, StringUtils.stringToPath(
470                        getPathStrings(commaSeparatedPaths)));
471  }
472
473  /**
474   * Add the given comma separated paths to the list of inputs for
475   *  the map-reduce job.
476   * 
477   * @param job The job to modify
478   * @param commaSeparatedPaths Comma separated paths to be added to
479   *        the list of inputs for the map-reduce job.
480   */
481  public static void addInputPaths(Job job, 
482                                   String commaSeparatedPaths
483                                   ) throws IOException {
484    for (String str : getPathStrings(commaSeparatedPaths)) {
485      addInputPath(job, new Path(str));
486    }
487  }
488
489  /**
490   * Set the array of {@link Path}s as the list of inputs
491   * for the map-reduce job.
492   * 
493   * @param job The job to modify 
494   * @param inputPaths the {@link Path}s of the input directories/files 
495   * for the map-reduce job.
496   */ 
497  public static void setInputPaths(Job job, 
498                                   Path... inputPaths) throws IOException {
499    Configuration conf = job.getConfiguration();
500    Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
501    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
502    for(int i = 1; i < inputPaths.length;i++) {
503      str.append(StringUtils.COMMA_STR);
504      path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
505      str.append(StringUtils.escapeString(path.toString()));
506    }
507    conf.set(INPUT_DIR, str.toString());
508  }
509
510  /**
511   * Add a {@link Path} to the list of inputs for the map-reduce job.
512   * 
513   * @param job The {@link Job} to modify
514   * @param path {@link Path} to be added to the list of inputs for 
515   *            the map-reduce job.
516   */
517  public static void addInputPath(Job job, 
518                                  Path path) throws IOException {
519    Configuration conf = job.getConfiguration();
520    path = path.getFileSystem(conf).makeQualified(path);
521    String dirStr = StringUtils.escapeString(path.toString());
522    String dirs = conf.get(INPUT_DIR);
523    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
524  }
525  
526  // This method escapes commas in the glob pattern of the given paths.
527  private static String[] getPathStrings(String commaSeparatedPaths) {
528    int length = commaSeparatedPaths.length();
529    int curlyOpen = 0;
530    int pathStart = 0;
531    boolean globPattern = false;
532    List<String> pathStrings = new ArrayList<String>();
533    
534    for (int i=0; i<length; i++) {
535      char ch = commaSeparatedPaths.charAt(i);
536      switch(ch) {
537        case '{' : {
538          curlyOpen++;
539          if (!globPattern) {
540            globPattern = true;
541          }
542          break;
543        }
544        case '}' : {
545          curlyOpen--;
546          if (curlyOpen == 0 && globPattern) {
547            globPattern = false;
548          }
549          break;
550        }
551        case ',' : {
552          if (!globPattern) {
553            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
554            pathStart = i + 1 ;
555          }
556          break;
557        }
558        default:
559          continue; // nothing special to do for this character
560      }
561    }
562    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
563    
564    return pathStrings.toArray(new String[0]);
565  }
566  
567  /**
568   * Get the list of input {@link Path}s for the map-reduce job.
569   * 
570   * @param context The job
571   * @return the list of input {@link Path}s for the map-reduce job.
572   */
573  public static Path[] getInputPaths(JobContext context) {
574    String dirs = context.getConfiguration().get(INPUT_DIR, "");
575    String [] list = StringUtils.split(dirs);
576    Path[] result = new Path[list.length];
577    for (int i = 0; i < list.length; i++) {
578      result[i] = new Path(StringUtils.unEscapeString(list[i]));
579    }
580    return result;
581  }
582
583}