001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileStatus;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.LocatedFileStatus;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.fs.PathFilter;
035    import org.apache.hadoop.fs.BlockLocation;
036    import org.apache.hadoop.fs.RemoteIterator;
037    import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
038    import org.apache.hadoop.mapreduce.InputFormat;
039    import org.apache.hadoop.mapreduce.InputSplit;
040    import org.apache.hadoop.mapreduce.Job;
041    import org.apache.hadoop.mapreduce.JobContext;
042    import org.apache.hadoop.mapreduce.Mapper;
043    import org.apache.hadoop.mapreduce.security.TokenCache;
044    import org.apache.hadoop.util.ReflectionUtils;
045    import org.apache.hadoop.util.StringUtils;
046    
047    import com.google.common.base.Stopwatch;
048    import com.google.common.collect.Lists;
049    
050    /** 
051     * A base class for file-based {@link InputFormat}s.
052     * 
053     * <p><code>FileInputFormat</code> is the base class for all file-based 
054     * <code>InputFormat</code>s. This provides a generic implementation of
055     * {@link #getSplits(JobContext)}.
056     * Subclasses of <code>FileInputFormat</code> can also override the 
057     * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
058     * not split-up and are processed as a whole by {@link Mapper}s.
059     */
060    @InterfaceAudience.Public
061    @InterfaceStability.Stable
062    public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
063      public static final String INPUT_DIR = 
064        "mapreduce.input.fileinputformat.inputdir";
065      public static final String SPLIT_MAXSIZE = 
066        "mapreduce.input.fileinputformat.split.maxsize";
067      public static final String SPLIT_MINSIZE = 
068        "mapreduce.input.fileinputformat.split.minsize";
069      public static final String PATHFILTER_CLASS = 
070        "mapreduce.input.pathFilter.class";
071      public static final String NUM_INPUT_FILES =
072        "mapreduce.input.fileinputformat.numinputfiles";
073      public static final String INPUT_DIR_RECURSIVE =
074        "mapreduce.input.fileinputformat.input.dir.recursive";
075      public static final String LIST_STATUS_NUM_THREADS =
076          "mapreduce.input.fileinputformat.list-status.num-threads";
077      public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
078    
079      private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
080    
081      private static final double SPLIT_SLOP = 1.1;   // 10% slop
082      
083      @Deprecated
084      public static enum Counter { 
085        BYTES_READ
086      }
087    
088      private static final PathFilter hiddenFileFilter = new PathFilter(){
089          public boolean accept(Path p){
090            String name = p.getName(); 
091            return !name.startsWith("_") && !name.startsWith("."); 
092          }
093        }; 
094    
095      /**
096       * Proxy PathFilter that accepts a path only if all filters given in the
097       * constructor do. Used by the listPaths() to apply the built-in
098       * hiddenFileFilter together with a user provided one (if any).
099       */
100      private static class MultiPathFilter implements PathFilter {
101        private List<PathFilter> filters;
102    
103        public MultiPathFilter(List<PathFilter> filters) {
104          this.filters = filters;
105        }
106    
107        public boolean accept(Path path) {
108          for (PathFilter filter : filters) {
109            if (!filter.accept(path)) {
110              return false;
111            }
112          }
113          return true;
114        }
115      }
116      
117      /**
118       * @param job
119       *          the job to modify
120       * @param inputDirRecursive
121       */
122      public static void setInputDirRecursive(Job job,
123          boolean inputDirRecursive) {
124        job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
125            inputDirRecursive);
126      }
127     
128      /**
129       * @param job
130       *          the job to look at.
131       * @return should the files to be read recursively?
132       */
133      public static boolean getInputDirRecursive(JobContext job) {
134        return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
135            false);
136      }
137    
138      /**
139       * Get the lower bound on split size imposed by the format.
140       * @return the number of bytes of the minimal split for this format
141       */
142      protected long getFormatMinSplitSize() {
143        return 1;
144      }
145    
146      /**
147       * Is the given filename splitable? Usually, true, but if the file is
148       * stream compressed, it will not be.
149       * 
150       * <code>FileInputFormat</code> implementations can override this and return
151       * <code>false</code> to ensure that individual input files are never split-up
152       * so that {@link Mapper}s process entire files.
153       * 
154       * @param context the job context
155       * @param filename the file name to check
156       * @return is this file splitable?
157       */
158      protected boolean isSplitable(JobContext context, Path filename) {
159        return true;
160      }
161    
162      /**
163       * Set a PathFilter to be applied to the input paths for the map-reduce job.
164       * @param job the job to modify
165       * @param filter the PathFilter class use for filtering the input paths.
166       */
167      public static void setInputPathFilter(Job job,
168                                            Class<? extends PathFilter> filter) {
169        job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
170                                        PathFilter.class);
171      }
172    
173      /**
174       * Set the minimum input split size
175       * @param job the job to modify
176       * @param size the minimum size
177       */
178      public static void setMinInputSplitSize(Job job,
179                                              long size) {
180        job.getConfiguration().setLong(SPLIT_MINSIZE, size);
181      }
182    
183      /**
184       * Get the minimum split size
185       * @param job the job
186       * @return the minimum number of bytes that can be in a split
187       */
188      public static long getMinSplitSize(JobContext job) {
189        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
190      }
191    
192      /**
193       * Set the maximum split size
194       * @param job the job to modify
195       * @param size the maximum split size
196       */
197      public static void setMaxInputSplitSize(Job job,
198                                              long size) {
199        job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
200      }
201    
202      /**
203       * Get the maximum split size.
204       * @param context the job to look at.
205       * @return the maximum number of bytes a split can include
206       */
207      public static long getMaxSplitSize(JobContext context) {
208        return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
209                                                  Long.MAX_VALUE);
210      }
211    
212      /**
213       * Get a PathFilter instance of the filter set for the input paths.
214       *
215       * @return the PathFilter instance set for the job, NULL if none has been set.
216       */
217      public static PathFilter getInputPathFilter(JobContext context) {
218        Configuration conf = context.getConfiguration();
219        Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
220            PathFilter.class);
221        return (filterClass != null) ?
222            (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
223      }
224    
225      /** List input directories.
226       * Subclasses may override to, e.g., select only files matching a regular
227       * expression. 
228       * 
229       * @param job the job to list input paths for
230       * @return array of FileStatus objects
231       * @throws IOException if zero items.
232       */
233      protected List<FileStatus> listStatus(JobContext job
234                                            ) throws IOException {
235        Path[] dirs = getInputPaths(job);
236        if (dirs.length == 0) {
237          throw new IOException("No input paths specified in job");
238        }
239        
240        // get tokens for all the required FileSystems..
241        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
242                                            job.getConfiguration());
243    
244        // Whether we need to recursive look into the directory structure
245        boolean recursive = getInputDirRecursive(job);
246    
247        // creates a MultiPathFilter with the hiddenFileFilter and the
248        // user provided one (if any).
249        List<PathFilter> filters = new ArrayList<PathFilter>();
250        filters.add(hiddenFileFilter);
251        PathFilter jobFilter = getInputPathFilter(job);
252        if (jobFilter != null) {
253          filters.add(jobFilter);
254        }
255        PathFilter inputFilter = new MultiPathFilter(filters);
256        
257        List<FileStatus> result = null;
258    
259        int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
260            DEFAULT_LIST_STATUS_NUM_THREADS);
261        Stopwatch sw = new Stopwatch().start();
262        if (numThreads == 1) {
263          result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
264        } else {
265          Iterable<FileStatus> locatedFiles = null;
266          try {
267            LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
268                job.getConfiguration(), dirs, recursive, inputFilter, true);
269            locatedFiles = locatedFileStatusFetcher.getFileStatuses();
270          } catch (InterruptedException e) {
271            throw new IOException("Interrupted while getting file statuses");
272          }
273          result = Lists.newArrayList(locatedFiles);
274        }
275        
276        sw.stop();
277        if (LOG.isDebugEnabled()) {
278          LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
279        }
280        LOG.info("Total input paths to process : " + result.size()); 
281        return result;
282      }
283    
284      private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
285          PathFilter inputFilter, boolean recursive) throws IOException {
286        List<FileStatus> result = new ArrayList<FileStatus>();
287        List<IOException> errors = new ArrayList<IOException>();
288        for (int i=0; i < dirs.length; ++i) {
289          Path p = dirs[i];
290          FileSystem fs = p.getFileSystem(job.getConfiguration()); 
291          FileStatus[] matches = fs.globStatus(p, inputFilter);
292          if (matches == null) {
293            errors.add(new IOException("Input path does not exist: " + p));
294          } else if (matches.length == 0) {
295            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
296          } else {
297            for (FileStatus globStat: matches) {
298              if (globStat.isDirectory()) {
299                RemoteIterator<LocatedFileStatus> iter =
300                    fs.listLocatedStatus(globStat.getPath());
301                while (iter.hasNext()) {
302                  LocatedFileStatus stat = iter.next();
303                  if (inputFilter.accept(stat.getPath())) {
304                    if (recursive && stat.isDirectory()) {
305                      addInputPathRecursively(result, fs, stat.getPath(),
306                          inputFilter);
307                    } else {
308                      result.add(stat);
309                    }
310                  }
311                }
312              } else {
313                result.add(globStat);
314              }
315            }
316          }
317        }
318    
319        if (!errors.isEmpty()) {
320          throw new InvalidInputException(errors);
321        }
322        return result;
323      }
324      
325      /**
326       * Add files in the input path recursively into the results.
327       * @param result
328       *          The List to store all files.
329       * @param fs
330       *          The FileSystem.
331       * @param path
332       *          The input path.
333       * @param inputFilter
334       *          The input filter that can be used to filter files/dirs. 
335       * @throws IOException
336       */
337      protected void addInputPathRecursively(List<FileStatus> result,
338          FileSystem fs, Path path, PathFilter inputFilter) 
339          throws IOException {
340        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
341        while (iter.hasNext()) {
342          LocatedFileStatus stat = iter.next();
343          if (inputFilter.accept(stat.getPath())) {
344            if (stat.isDirectory()) {
345              addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
346            } else {
347              result.add(stat);
348            }
349          }
350        }
351      }
352      
353      
354      /**
355       * A factory that makes the split for this class. It can be overridden
356       * by sub-classes to make sub-types
357       */
358      protected FileSplit makeSplit(Path file, long start, long length, 
359                                    String[] hosts) {
360        return new FileSplit(file, start, length, hosts);
361      }
362    
363      /** 
364       * Generate the list of files and make them into FileSplits.
365       * @param job the job context
366       * @throws IOException
367       */
368      public List<InputSplit> getSplits(JobContext job) throws IOException {
369        Stopwatch sw = new Stopwatch().start();
370        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
371        long maxSize = getMaxSplitSize(job);
372    
373        // generate splits
374        List<InputSplit> splits = new ArrayList<InputSplit>();
375        List<FileStatus> files = listStatus(job);
376        for (FileStatus file: files) {
377          Path path = file.getPath();
378          long length = file.getLen();
379          if (length != 0) {
380            BlockLocation[] blkLocations;
381            if (file instanceof LocatedFileStatus) {
382              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
383            } else {
384              FileSystem fs = path.getFileSystem(job.getConfiguration());
385              blkLocations = fs.getFileBlockLocations(file, 0, length);
386            }
387            if (isSplitable(job, path)) {
388              long blockSize = file.getBlockSize();
389              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
390    
391              long bytesRemaining = length;
392              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
393                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
394                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
395                                         blkLocations[blkIndex].getHosts()));
396                bytesRemaining -= splitSize;
397              }
398    
399              if (bytesRemaining != 0) {
400                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
401                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
402                           blkLocations[blkIndex].getHosts()));
403              }
404            } else { // not splitable
405              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
406            }
407          } else { 
408            //Create empty hosts array for zero length files
409            splits.add(makeSplit(path, 0, length, new String[0]));
410          }
411        }
412        // Save the number of input files for metrics/loadgen
413        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
414        sw.stop();
415        if (LOG.isDebugEnabled()) {
416          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
417              + ", TimeTaken: " + sw.elapsedMillis());
418        }
419        return splits;
420      }
421    
422      protected long computeSplitSize(long blockSize, long minSize,
423                                      long maxSize) {
424        return Math.max(minSize, Math.min(maxSize, blockSize));
425      }
426    
427      protected int getBlockIndex(BlockLocation[] blkLocations, 
428                                  long offset) {
429        for (int i = 0 ; i < blkLocations.length; i++) {
430          // is the offset inside this block?
431          if ((blkLocations[i].getOffset() <= offset) &&
432              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
433            return i;
434          }
435        }
436        BlockLocation last = blkLocations[blkLocations.length -1];
437        long fileLength = last.getOffset() + last.getLength() -1;
438        throw new IllegalArgumentException("Offset " + offset + 
439                                           " is outside of file (0.." +
440                                           fileLength + ")");
441      }
442    
443      /**
444       * Sets the given comma separated paths as the list of inputs 
445       * for the map-reduce job.
446       * 
447       * @param job the job
448       * @param commaSeparatedPaths Comma separated paths to be set as 
449       *        the list of inputs for the map-reduce job.
450       */
451      public static void setInputPaths(Job job, 
452                                       String commaSeparatedPaths
453                                       ) throws IOException {
454        setInputPaths(job, StringUtils.stringToPath(
455                            getPathStrings(commaSeparatedPaths)));
456      }
457    
458      /**
459       * Add the given comma separated paths to the list of inputs for
460       *  the map-reduce job.
461       * 
462       * @param job The job to modify
463       * @param commaSeparatedPaths Comma separated paths to be added to
464       *        the list of inputs for the map-reduce job.
465       */
466      public static void addInputPaths(Job job, 
467                                       String commaSeparatedPaths
468                                       ) throws IOException {
469        for (String str : getPathStrings(commaSeparatedPaths)) {
470          addInputPath(job, new Path(str));
471        }
472      }
473    
474      /**
475       * Set the array of {@link Path}s as the list of inputs
476       * for the map-reduce job.
477       * 
478       * @param job The job to modify 
479       * @param inputPaths the {@link Path}s of the input directories/files 
480       * for the map-reduce job.
481       */ 
482      public static void setInputPaths(Job job, 
483                                       Path... inputPaths) throws IOException {
484        Configuration conf = job.getConfiguration();
485        Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
486        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
487        for(int i = 1; i < inputPaths.length;i++) {
488          str.append(StringUtils.COMMA_STR);
489          path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
490          str.append(StringUtils.escapeString(path.toString()));
491        }
492        conf.set(INPUT_DIR, str.toString());
493      }
494    
495      /**
496       * Add a {@link Path} to the list of inputs for the map-reduce job.
497       * 
498       * @param job The {@link Job} to modify
499       * @param path {@link Path} to be added to the list of inputs for 
500       *            the map-reduce job.
501       */
502      public static void addInputPath(Job job, 
503                                      Path path) throws IOException {
504        Configuration conf = job.getConfiguration();
505        path = path.getFileSystem(conf).makeQualified(path);
506        String dirStr = StringUtils.escapeString(path.toString());
507        String dirs = conf.get(INPUT_DIR);
508        conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
509      }
510      
511      // This method escapes commas in the glob pattern of the given paths.
512      private static String[] getPathStrings(String commaSeparatedPaths) {
513        int length = commaSeparatedPaths.length();
514        int curlyOpen = 0;
515        int pathStart = 0;
516        boolean globPattern = false;
517        List<String> pathStrings = new ArrayList<String>();
518        
519        for (int i=0; i<length; i++) {
520          char ch = commaSeparatedPaths.charAt(i);
521          switch(ch) {
522            case '{' : {
523              curlyOpen++;
524              if (!globPattern) {
525                globPattern = true;
526              }
527              break;
528            }
529            case '}' : {
530              curlyOpen--;
531              if (curlyOpen == 0 && globPattern) {
532                globPattern = false;
533              }
534              break;
535            }
536            case ',' : {
537              if (!globPattern) {
538                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
539                pathStart = i + 1 ;
540              }
541              break;
542            }
543            default:
544              continue; // nothing special to do for this character
545          }
546        }
547        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
548        
549        return pathStrings.toArray(new String[0]);
550      }
551      
552      /**
553       * Get the list of input {@link Path}s for the map-reduce job.
554       * 
555       * @param context The job
556       * @return the list of input {@link Path}s for the map-reduce job.
557       */
558      public static Path[] getInputPaths(JobContext context) {
559        String dirs = context.getConfiguration().get(INPUT_DIR, "");
560        String [] list = StringUtils.split(dirs);
561        Path[] result = new Path[list.length];
562        for (int i = 0; i < list.length; i++) {
563          result[i] = new Path(StringUtils.unEscapeString(list[i]));
564        }
565        return result;
566      }
567    
568    }