001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileStatus;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.LocatedFileStatus;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PathFilter;
035import org.apache.hadoop.fs.BlockLocation;
036import org.apache.hadoop.fs.RemoteIterator;
037import org.apache.hadoop.mapreduce.InputFormat;
038import org.apache.hadoop.mapreduce.InputSplit;
039import org.apache.hadoop.mapreduce.Job;
040import org.apache.hadoop.mapreduce.JobContext;
041import org.apache.hadoop.mapreduce.Mapper;
042import org.apache.hadoop.mapreduce.security.TokenCache;
043import org.apache.hadoop.util.ReflectionUtils;
044import org.apache.hadoop.util.StringUtils;
045
046/** 
047 * A base class for file-based {@link InputFormat}s.
048 * 
049 * <p><code>FileInputFormat</code> is the base class for all file-based 
050 * <code>InputFormat</code>s. This provides a generic implementation of
051 * {@link #getSplits(JobContext)}.
052 * Subclasses of <code>FileInputFormat</code> can also override the 
053 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
054 * not split-up and are processed as a whole by {@link Mapper}s.
055 */
056@InterfaceAudience.Public
057@InterfaceStability.Stable
058public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
059  public static final String INPUT_DIR = 
060    "mapreduce.input.fileinputformat.inputdir";
061  public static final String SPLIT_MAXSIZE = 
062    "mapreduce.input.fileinputformat.split.maxsize";
063  public static final String SPLIT_MINSIZE = 
064    "mapreduce.input.fileinputformat.split.minsize";
065  public static final String PATHFILTER_CLASS = 
066    "mapreduce.input.pathFilter.class";
067  public static final String NUM_INPUT_FILES =
068    "mapreduce.input.fileinputformat.numinputfiles";
069  public static final String INPUT_DIR_RECURSIVE =
070    "mapreduce.input.fileinputformat.input.dir.recursive";
071
072  private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
073
074  private static final double SPLIT_SLOP = 1.1;   // 10% slop
075
076  private static final PathFilter hiddenFileFilter = new PathFilter(){
077      public boolean accept(Path p){
078        String name = p.getName(); 
079        return !name.startsWith("_") && !name.startsWith("."); 
080      }
081    }; 
082
083  /**
084   * Proxy PathFilter that accepts a path only if all filters given in the
085   * constructor do. Used by the listPaths() to apply the built-in
086   * hiddenFileFilter together with a user provided one (if any).
087   */
088  private static class MultiPathFilter implements PathFilter {
089    private List<PathFilter> filters;
090
091    public MultiPathFilter(List<PathFilter> filters) {
092      this.filters = filters;
093    }
094
095    public boolean accept(Path path) {
096      for (PathFilter filter : filters) {
097        if (!filter.accept(path)) {
098          return false;
099        }
100      }
101      return true;
102    }
103  }
104  
105  /**
106   * @param job
107   *          the job to modify
108   * @param inputDirRecursive
109   */
110  public static void setInputDirRecursive(Job job,
111      boolean inputDirRecursive) {
112    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
113        inputDirRecursive);
114  }
115 
116  /**
117   * @param job
118   *          the job to look at.
119   * @return should the files to be read recursively?
120   */
121  public static boolean getInputDirRecursive(JobContext job) {
122    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
123        false);
124  }
125
126  /**
127   * Get the lower bound on split size imposed by the format.
128   * @return the number of bytes of the minimal split for this format
129   */
130  protected long getFormatMinSplitSize() {
131    return 1;
132  }
133
134  /**
135   * Is the given filename splitable? Usually, true, but if the file is
136   * stream compressed, it will not be.
137   * 
138   * <code>FileInputFormat</code> implementations can override this and return
139   * <code>false</code> to ensure that individual input files are never split-up
140   * so that {@link Mapper}s process entire files.
141   * 
142   * @param context the job context
143   * @param filename the file name to check
144   * @return is this file splitable?
145   */
146  protected boolean isSplitable(JobContext context, Path filename) {
147    return true;
148  }
149
150  /**
151   * Set a PathFilter to be applied to the input paths for the map-reduce job.
152   * @param job the job to modify
153   * @param filter the PathFilter class use for filtering the input paths.
154   */
155  public static void setInputPathFilter(Job job,
156                                        Class<? extends PathFilter> filter) {
157    job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
158                                    PathFilter.class);
159  }
160
161  /**
162   * Set the minimum input split size
163   * @param job the job to modify
164   * @param size the minimum size
165   */
166  public static void setMinInputSplitSize(Job job,
167                                          long size) {
168    job.getConfiguration().setLong(SPLIT_MINSIZE, size);
169  }
170
171  /**
172   * Get the minimum split size
173   * @param job the job
174   * @return the minimum number of bytes that can be in a split
175   */
176  public static long getMinSplitSize(JobContext job) {
177    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
178  }
179
180  /**
181   * Set the maximum split size
182   * @param job the job to modify
183   * @param size the maximum split size
184   */
185  public static void setMaxInputSplitSize(Job job,
186                                          long size) {
187    job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
188  }
189
190  /**
191   * Get the maximum split size.
192   * @param context the job to look at.
193   * @return the maximum number of bytes a split can include
194   */
195  public static long getMaxSplitSize(JobContext context) {
196    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
197                                              Long.MAX_VALUE);
198  }
199
200  /**
201   * Get a PathFilter instance of the filter set for the input paths.
202   *
203   * @return the PathFilter instance set for the job, NULL if none has been set.
204   */
205  public static PathFilter getInputPathFilter(JobContext context) {
206    Configuration conf = context.getConfiguration();
207    Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
208        PathFilter.class);
209    return (filterClass != null) ?
210        (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
211  }
212
213  /** List input directories.
214   * Subclasses may override to, e.g., select only files matching a regular
215   * expression. 
216   * 
217   * @param job the job to list input paths for
218   * @return array of FileStatus objects
219   * @throws IOException if zero items.
220   */
221  protected List<FileStatus> listStatus(JobContext job
222                                        ) throws IOException {
223    List<FileStatus> result = new ArrayList<FileStatus>();
224    Path[] dirs = getInputPaths(job);
225    if (dirs.length == 0) {
226      throw new IOException("No input paths specified in job");
227    }
228    
229    // get tokens for all the required FileSystems..
230    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
231                                        job.getConfiguration());
232
233    // Whether we need to recursive look into the directory structure
234    boolean recursive = getInputDirRecursive(job);
235    
236    List<IOException> errors = new ArrayList<IOException>();
237    
238    // creates a MultiPathFilter with the hiddenFileFilter and the
239    // user provided one (if any).
240    List<PathFilter> filters = new ArrayList<PathFilter>();
241    filters.add(hiddenFileFilter);
242    PathFilter jobFilter = getInputPathFilter(job);
243    if (jobFilter != null) {
244      filters.add(jobFilter);
245    }
246    PathFilter inputFilter = new MultiPathFilter(filters);
247    
248    for (int i=0; i < dirs.length; ++i) {
249      Path p = dirs[i];
250      FileSystem fs = p.getFileSystem(job.getConfiguration()); 
251      FileStatus[] matches = fs.globStatus(p, inputFilter);
252      if (matches == null) {
253        errors.add(new IOException("Input path does not exist: " + p));
254      } else if (matches.length == 0) {
255        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
256      } else {
257        for (FileStatus globStat: matches) {
258          if (globStat.isDirectory()) {
259            RemoteIterator<LocatedFileStatus> iter =
260                fs.listLocatedStatus(globStat.getPath());
261            while (iter.hasNext()) {
262              LocatedFileStatus stat = iter.next();
263              if (inputFilter.accept(stat.getPath())) {
264                if (recursive && stat.isDirectory()) {
265                  addInputPathRecursively(result, fs, stat.getPath(),
266                      inputFilter);
267                } else {
268                  result.add(stat);
269                }
270              }
271            }
272          } else {
273            result.add(globStat);
274          }
275        }
276      }
277    }
278
279    if (!errors.isEmpty()) {
280      throw new InvalidInputException(errors);
281    }
282    LOG.info("Total input paths to process : " + result.size()); 
283    return result;
284  }
285  
286  /**
287   * Add files in the input path recursively into the results.
288   * @param result
289   *          The List to store all files.
290   * @param fs
291   *          The FileSystem.
292   * @param path
293   *          The input path.
294   * @param inputFilter
295   *          The input filter that can be used to filter files/dirs. 
296   * @throws IOException
297   */
298  protected void addInputPathRecursively(List<FileStatus> result,
299      FileSystem fs, Path path, PathFilter inputFilter) 
300      throws IOException {
301    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
302    while (iter.hasNext()) {
303      LocatedFileStatus stat = iter.next();
304      if (inputFilter.accept(stat.getPath())) {
305        if (stat.isDirectory()) {
306          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
307        } else {
308          result.add(stat);
309        }
310      }
311    }
312  }
313  
314  
315  /**
316   * A factory that makes the split for this class. It can be overridden
317   * by sub-classes to make sub-types
318   */
319  protected FileSplit makeSplit(Path file, long start, long length, 
320                                String[] hosts) {
321    return new FileSplit(file, start, length, hosts);
322  }
323
324  /** 
325   * Generate the list of files and make them into FileSplits.
326   * @param job the job context
327   * @throws IOException
328   */
329  public List<InputSplit> getSplits(JobContext job) throws IOException {
330    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
331    long maxSize = getMaxSplitSize(job);
332
333    // generate splits
334    List<InputSplit> splits = new ArrayList<InputSplit>();
335    List<FileStatus> files = listStatus(job);
336    for (FileStatus file: files) {
337      Path path = file.getPath();
338      long length = file.getLen();
339      if (length != 0) {
340        BlockLocation[] blkLocations;
341        if (file instanceof LocatedFileStatus) {
342          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
343        } else {
344          FileSystem fs = path.getFileSystem(job.getConfiguration());
345          blkLocations = fs.getFileBlockLocations(file, 0, length);
346        }
347        if (isSplitable(job, path)) {
348          long blockSize = file.getBlockSize();
349          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
350
351          long bytesRemaining = length;
352          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
353            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
354            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
355                                     blkLocations[blkIndex].getHosts()));
356            bytesRemaining -= splitSize;
357          }
358
359          if (bytesRemaining != 0) {
360            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
361            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
362                       blkLocations[blkIndex].getHosts()));
363          }
364        } else { // not splitable
365          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
366        }
367      } else { 
368        //Create empty hosts array for zero length files
369        splits.add(makeSplit(path, 0, length, new String[0]));
370      }
371    }
372    // Save the number of input files for metrics/loadgen
373    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
374    LOG.debug("Total # of splits: " + splits.size());
375    return splits;
376  }
377
378  protected long computeSplitSize(long blockSize, long minSize,
379                                  long maxSize) {
380    return Math.max(minSize, Math.min(maxSize, blockSize));
381  }
382
383  protected int getBlockIndex(BlockLocation[] blkLocations, 
384                              long offset) {
385    for (int i = 0 ; i < blkLocations.length; i++) {
386      // is the offset inside this block?
387      if ((blkLocations[i].getOffset() <= offset) &&
388          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
389        return i;
390      }
391    }
392    BlockLocation last = blkLocations[blkLocations.length -1];
393    long fileLength = last.getOffset() + last.getLength() -1;
394    throw new IllegalArgumentException("Offset " + offset + 
395                                       " is outside of file (0.." +
396                                       fileLength + ")");
397  }
398
399  /**
400   * Sets the given comma separated paths as the list of inputs 
401   * for the map-reduce job.
402   * 
403   * @param job the job
404   * @param commaSeparatedPaths Comma separated paths to be set as 
405   *        the list of inputs for the map-reduce job.
406   */
407  public static void setInputPaths(Job job, 
408                                   String commaSeparatedPaths
409                                   ) throws IOException {
410    setInputPaths(job, StringUtils.stringToPath(
411                        getPathStrings(commaSeparatedPaths)));
412  }
413
414  /**
415   * Add the given comma separated paths to the list of inputs for
416   *  the map-reduce job.
417   * 
418   * @param job The job to modify
419   * @param commaSeparatedPaths Comma separated paths to be added to
420   *        the list of inputs for the map-reduce job.
421   */
422  public static void addInputPaths(Job job, 
423                                   String commaSeparatedPaths
424                                   ) throws IOException {
425    for (String str : getPathStrings(commaSeparatedPaths)) {
426      addInputPath(job, new Path(str));
427    }
428  }
429
430  /**
431   * Set the array of {@link Path}s as the list of inputs
432   * for the map-reduce job.
433   * 
434   * @param job The job to modify 
435   * @param inputPaths the {@link Path}s of the input directories/files 
436   * for the map-reduce job.
437   */ 
438  public static void setInputPaths(Job job, 
439                                   Path... inputPaths) throws IOException {
440    Configuration conf = job.getConfiguration();
441    Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
442    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
443    for(int i = 1; i < inputPaths.length;i++) {
444      str.append(StringUtils.COMMA_STR);
445      path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
446      str.append(StringUtils.escapeString(path.toString()));
447    }
448    conf.set(INPUT_DIR, str.toString());
449  }
450
451  /**
452   * Add a {@link Path} to the list of inputs for the map-reduce job.
453   * 
454   * @param job The {@link Job} to modify
455   * @param path {@link Path} to be added to the list of inputs for 
456   *            the map-reduce job.
457   */
458  public static void addInputPath(Job job, 
459                                  Path path) throws IOException {
460    Configuration conf = job.getConfiguration();
461    path = path.getFileSystem(conf).makeQualified(path);
462    String dirStr = StringUtils.escapeString(path.toString());
463    String dirs = conf.get(INPUT_DIR);
464    conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
465  }
466  
467  // This method escapes commas in the glob pattern of the given paths.
468  private static String[] getPathStrings(String commaSeparatedPaths) {
469    int length = commaSeparatedPaths.length();
470    int curlyOpen = 0;
471    int pathStart = 0;
472    boolean globPattern = false;
473    List<String> pathStrings = new ArrayList<String>();
474    
475    for (int i=0; i<length; i++) {
476      char ch = commaSeparatedPaths.charAt(i);
477      switch(ch) {
478        case '{' : {
479          curlyOpen++;
480          if (!globPattern) {
481            globPattern = true;
482          }
483          break;
484        }
485        case '}' : {
486          curlyOpen--;
487          if (curlyOpen == 0 && globPattern) {
488            globPattern = false;
489          }
490          break;
491        }
492        case ',' : {
493          if (!globPattern) {
494            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
495            pathStart = i + 1 ;
496          }
497          break;
498        }
499      }
500    }
501    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
502    
503    return pathStrings.toArray(new String[0]);
504  }
505  
506  /**
507   * Get the list of input {@link Path}s for the map-reduce job.
508   * 
509   * @param context The job
510   * @return the list of input {@link Path}s for the map-reduce job.
511   */
512  public static Path[] getInputPaths(JobContext context) {
513    String dirs = context.getConfiguration().get(INPUT_DIR, "");
514    String [] list = StringUtils.split(dirs);
515    Path[] result = new Path[list.length];
516    for (int i = 0; i < list.length; i++) {
517      result[i] = new Path(StringUtils.unEscapeString(list[i]));
518    }
519    return result;
520  }
521
522}