001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileStatus;
031    import org.apache.hadoop.fs.FileSystem;
032    import org.apache.hadoop.fs.LocatedFileStatus;
033    import org.apache.hadoop.fs.Path;
034    import org.apache.hadoop.fs.PathFilter;
035    import org.apache.hadoop.fs.BlockLocation;
036    import org.apache.hadoop.fs.RemoteIterator;
037    import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
038    import org.apache.hadoop.mapred.SplitLocationInfo;
039    import org.apache.hadoop.mapreduce.InputFormat;
040    import org.apache.hadoop.mapreduce.InputSplit;
041    import org.apache.hadoop.mapreduce.Job;
042    import org.apache.hadoop.mapreduce.JobContext;
043    import org.apache.hadoop.mapreduce.Mapper;
044    import org.apache.hadoop.mapreduce.security.TokenCache;
045    import org.apache.hadoop.util.ReflectionUtils;
046    import org.apache.hadoop.util.StringUtils;
047    
048    import com.google.common.base.Stopwatch;
049    import com.google.common.collect.Lists;
050    
051    /** 
052     * A base class for file-based {@link InputFormat}s.
053     * 
054     * <p><code>FileInputFormat</code> is the base class for all file-based 
055     * <code>InputFormat</code>s. This provides a generic implementation of
056     * {@link #getSplits(JobContext)}.
057     * Subclasses of <code>FileInputFormat</code> can also override the 
058     * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
059     * not split-up and are processed as a whole by {@link Mapper}s.
060     */
061    @InterfaceAudience.Public
062    @InterfaceStability.Stable
063    public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
064      public static final String INPUT_DIR = 
065        "mapreduce.input.fileinputformat.inputdir";
066      public static final String SPLIT_MAXSIZE = 
067        "mapreduce.input.fileinputformat.split.maxsize";
068      public static final String SPLIT_MINSIZE = 
069        "mapreduce.input.fileinputformat.split.minsize";
070      public static final String PATHFILTER_CLASS = 
071        "mapreduce.input.pathFilter.class";
072      public static final String NUM_INPUT_FILES =
073        "mapreduce.input.fileinputformat.numinputfiles";
074      public static final String INPUT_DIR_RECURSIVE =
075        "mapreduce.input.fileinputformat.input.dir.recursive";
076      public static final String LIST_STATUS_NUM_THREADS =
077          "mapreduce.input.fileinputformat.list-status.num-threads";
078      public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
079    
080      private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
081    
082      private static final double SPLIT_SLOP = 1.1;   // 10% slop
083      
084      @Deprecated
085      public static enum Counter { 
086        BYTES_READ
087      }
088    
089      private static final PathFilter hiddenFileFilter = new PathFilter(){
090          public boolean accept(Path p){
091            String name = p.getName(); 
092            return !name.startsWith("_") && !name.startsWith("."); 
093          }
094        }; 
095    
096      /**
097       * Proxy PathFilter that accepts a path only if all filters given in the
098       * constructor do. Used by the listPaths() to apply the built-in
099       * hiddenFileFilter together with a user provided one (if any).
100       */
101      private static class MultiPathFilter implements PathFilter {
102        private List<PathFilter> filters;
103    
104        public MultiPathFilter(List<PathFilter> filters) {
105          this.filters = filters;
106        }
107    
108        public boolean accept(Path path) {
109          for (PathFilter filter : filters) {
110            if (!filter.accept(path)) {
111              return false;
112            }
113          }
114          return true;
115        }
116      }
117      
118      /**
119       * @param job
120       *          the job to modify
121       * @param inputDirRecursive
122       */
123      public static void setInputDirRecursive(Job job,
124          boolean inputDirRecursive) {
125        job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
126            inputDirRecursive);
127      }
128     
129      /**
130       * @param job
131       *          the job to look at.
132       * @return should the files to be read recursively?
133       */
134      public static boolean getInputDirRecursive(JobContext job) {
135        return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
136            false);
137      }
138    
139      /**
140       * Get the lower bound on split size imposed by the format.
141       * @return the number of bytes of the minimal split for this format
142       */
143      protected long getFormatMinSplitSize() {
144        return 1;
145      }
146    
147      /**
148       * Is the given filename splitable? Usually, true, but if the file is
149       * stream compressed, it will not be.
150       * 
151       * <code>FileInputFormat</code> implementations can override this and return
152       * <code>false</code> to ensure that individual input files are never split-up
153       * so that {@link Mapper}s process entire files.
154       * 
155       * @param context the job context
156       * @param filename the file name to check
157       * @return is this file splitable?
158       */
159      protected boolean isSplitable(JobContext context, Path filename) {
160        return true;
161      }
162    
163      /**
164       * Set a PathFilter to be applied to the input paths for the map-reduce job.
165       * @param job the job to modify
166       * @param filter the PathFilter class use for filtering the input paths.
167       */
168      public static void setInputPathFilter(Job job,
169                                            Class<? extends PathFilter> filter) {
170        job.getConfiguration().setClass(PATHFILTER_CLASS, filter, 
171                                        PathFilter.class);
172      }
173    
174      /**
175       * Set the minimum input split size
176       * @param job the job to modify
177       * @param size the minimum size
178       */
179      public static void setMinInputSplitSize(Job job,
180                                              long size) {
181        job.getConfiguration().setLong(SPLIT_MINSIZE, size);
182      }
183    
184      /**
185       * Get the minimum split size
186       * @param job the job
187       * @return the minimum number of bytes that can be in a split
188       */
189      public static long getMinSplitSize(JobContext job) {
190        return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
191      }
192    
193      /**
194       * Set the maximum split size
195       * @param job the job to modify
196       * @param size the maximum split size
197       */
198      public static void setMaxInputSplitSize(Job job,
199                                              long size) {
200        job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
201      }
202    
203      /**
204       * Get the maximum split size.
205       * @param context the job to look at.
206       * @return the maximum number of bytes a split can include
207       */
208      public static long getMaxSplitSize(JobContext context) {
209        return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
210                                                  Long.MAX_VALUE);
211      }
212    
213      /**
214       * Get a PathFilter instance of the filter set for the input paths.
215       *
216       * @return the PathFilter instance set for the job, NULL if none has been set.
217       */
218      public static PathFilter getInputPathFilter(JobContext context) {
219        Configuration conf = context.getConfiguration();
220        Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
221            PathFilter.class);
222        return (filterClass != null) ?
223            (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
224      }
225    
226      /** List input directories.
227       * Subclasses may override to, e.g., select only files matching a regular
228       * expression. 
229       * 
230       * @param job the job to list input paths for
231       * @return array of FileStatus objects
232       * @throws IOException if zero items.
233       */
234      protected List<FileStatus> listStatus(JobContext job
235                                            ) throws IOException {
236        Path[] dirs = getInputPaths(job);
237        if (dirs.length == 0) {
238          throw new IOException("No input paths specified in job");
239        }
240        
241        // get tokens for all the required FileSystems..
242        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
243                                            job.getConfiguration());
244    
245        // Whether we need to recursive look into the directory structure
246        boolean recursive = getInputDirRecursive(job);
247    
248        // creates a MultiPathFilter with the hiddenFileFilter and the
249        // user provided one (if any).
250        List<PathFilter> filters = new ArrayList<PathFilter>();
251        filters.add(hiddenFileFilter);
252        PathFilter jobFilter = getInputPathFilter(job);
253        if (jobFilter != null) {
254          filters.add(jobFilter);
255        }
256        PathFilter inputFilter = new MultiPathFilter(filters);
257        
258        List<FileStatus> result = null;
259    
260        int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
261            DEFAULT_LIST_STATUS_NUM_THREADS);
262        Stopwatch sw = new Stopwatch().start();
263        if (numThreads == 1) {
264          result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
265        } else {
266          Iterable<FileStatus> locatedFiles = null;
267          try {
268            LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
269                job.getConfiguration(), dirs, recursive, inputFilter, true);
270            locatedFiles = locatedFileStatusFetcher.getFileStatuses();
271          } catch (InterruptedException e) {
272            throw new IOException("Interrupted while getting file statuses");
273          }
274          result = Lists.newArrayList(locatedFiles);
275        }
276        
277        sw.stop();
278        if (LOG.isDebugEnabled()) {
279          LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
280        }
281        LOG.info("Total input paths to process : " + result.size()); 
282        return result;
283      }
284    
285      private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
286          PathFilter inputFilter, boolean recursive) throws IOException {
287        List<FileStatus> result = new ArrayList<FileStatus>();
288        List<IOException> errors = new ArrayList<IOException>();
289        for (int i=0; i < dirs.length; ++i) {
290          Path p = dirs[i];
291          FileSystem fs = p.getFileSystem(job.getConfiguration()); 
292          FileStatus[] matches = fs.globStatus(p, inputFilter);
293          if (matches == null) {
294            errors.add(new IOException("Input path does not exist: " + p));
295          } else if (matches.length == 0) {
296            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
297          } else {
298            for (FileStatus globStat: matches) {
299              if (globStat.isDirectory()) {
300                RemoteIterator<LocatedFileStatus> iter =
301                    fs.listLocatedStatus(globStat.getPath());
302                while (iter.hasNext()) {
303                  LocatedFileStatus stat = iter.next();
304                  if (inputFilter.accept(stat.getPath())) {
305                    if (recursive && stat.isDirectory()) {
306                      addInputPathRecursively(result, fs, stat.getPath(),
307                          inputFilter);
308                    } else {
309                      result.add(stat);
310                    }
311                  }
312                }
313              } else {
314                result.add(globStat);
315              }
316            }
317          }
318        }
319    
320        if (!errors.isEmpty()) {
321          throw new InvalidInputException(errors);
322        }
323        return result;
324      }
325      
326      /**
327       * Add files in the input path recursively into the results.
328       * @param result
329       *          The List to store all files.
330       * @param fs
331       *          The FileSystem.
332       * @param path
333       *          The input path.
334       * @param inputFilter
335       *          The input filter that can be used to filter files/dirs. 
336       * @throws IOException
337       */
338      protected void addInputPathRecursively(List<FileStatus> result,
339          FileSystem fs, Path path, PathFilter inputFilter) 
340          throws IOException {
341        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
342        while (iter.hasNext()) {
343          LocatedFileStatus stat = iter.next();
344          if (inputFilter.accept(stat.getPath())) {
345            if (stat.isDirectory()) {
346              addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
347            } else {
348              result.add(stat);
349            }
350          }
351        }
352      }
353      
354      
355      /**
356       * A factory that makes the split for this class. It can be overridden
357       * by sub-classes to make sub-types
358       */
359      protected FileSplit makeSplit(Path file, long start, long length, 
360                                    String[] hosts) {
361        return new FileSplit(file, start, length, hosts);
362      }
363      
364      /**
365       * A factory that makes the split for this class. It can be overridden
366       * by sub-classes to make sub-types
367       */
368      protected FileSplit makeSplit(Path file, long start, long length, 
369                                    String[] hosts, String[] inMemoryHosts) {
370        return new FileSplit(file, start, length, hosts, inMemoryHosts);
371      }
372    
373      /** 
374       * Generate the list of files and make them into FileSplits.
375       * @param job the job context
376       * @throws IOException
377       */
378      public List<InputSplit> getSplits(JobContext job) throws IOException {
379        Stopwatch sw = new Stopwatch().start();
380        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
381        long maxSize = getMaxSplitSize(job);
382    
383        // generate splits
384        List<InputSplit> splits = new ArrayList<InputSplit>();
385        List<FileStatus> files = listStatus(job);
386        for (FileStatus file: files) {
387          Path path = file.getPath();
388          long length = file.getLen();
389          if (length != 0) {
390            BlockLocation[] blkLocations;
391            if (file instanceof LocatedFileStatus) {
392              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
393            } else {
394              FileSystem fs = path.getFileSystem(job.getConfiguration());
395              blkLocations = fs.getFileBlockLocations(file, 0, length);
396            }
397            if (isSplitable(job, path)) {
398              long blockSize = file.getBlockSize();
399              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
400    
401              long bytesRemaining = length;
402              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
403                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
404                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
405                            blkLocations[blkIndex].getHosts(),
406                            blkLocations[blkIndex].getCachedHosts()));
407                bytesRemaining -= splitSize;
408              }
409    
410              if (bytesRemaining != 0) {
411                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
412                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
413                           blkLocations[blkIndex].getHosts(),
414                           blkLocations[blkIndex].getCachedHosts()));
415              }
416            } else { // not splitable
417              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
418                          blkLocations[0].getCachedHosts()));
419            }
420          } else { 
421            //Create empty hosts array for zero length files
422            splits.add(makeSplit(path, 0, length, new String[0]));
423          }
424        }
425        // Save the number of input files for metrics/loadgen
426        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
427        sw.stop();
428        if (LOG.isDebugEnabled()) {
429          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
430              + ", TimeTaken: " + sw.elapsedMillis());
431        }
432        return splits;
433      }
434    
435      protected long computeSplitSize(long blockSize, long minSize,
436                                      long maxSize) {
437        return Math.max(minSize, Math.min(maxSize, blockSize));
438      }
439    
440      protected int getBlockIndex(BlockLocation[] blkLocations, 
441                                  long offset) {
442        for (int i = 0 ; i < blkLocations.length; i++) {
443          // is the offset inside this block?
444          if ((blkLocations[i].getOffset() <= offset) &&
445              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
446            return i;
447          }
448        }
449        BlockLocation last = blkLocations[blkLocations.length -1];
450        long fileLength = last.getOffset() + last.getLength() -1;
451        throw new IllegalArgumentException("Offset " + offset + 
452                                           " is outside of file (0.." +
453                                           fileLength + ")");
454      }
455    
456      /**
457       * Sets the given comma separated paths as the list of inputs 
458       * for the map-reduce job.
459       * 
460       * @param job the job
461       * @param commaSeparatedPaths Comma separated paths to be set as 
462       *        the list of inputs for the map-reduce job.
463       */
464      public static void setInputPaths(Job job, 
465                                       String commaSeparatedPaths
466                                       ) throws IOException {
467        setInputPaths(job, StringUtils.stringToPath(
468                            getPathStrings(commaSeparatedPaths)));
469      }
470    
471      /**
472       * Add the given comma separated paths to the list of inputs for
473       *  the map-reduce job.
474       * 
475       * @param job The job to modify
476       * @param commaSeparatedPaths Comma separated paths to be added to
477       *        the list of inputs for the map-reduce job.
478       */
479      public static void addInputPaths(Job job, 
480                                       String commaSeparatedPaths
481                                       ) throws IOException {
482        for (String str : getPathStrings(commaSeparatedPaths)) {
483          addInputPath(job, new Path(str));
484        }
485      }
486    
487      /**
488       * Set the array of {@link Path}s as the list of inputs
489       * for the map-reduce job.
490       * 
491       * @param job The job to modify 
492       * @param inputPaths the {@link Path}s of the input directories/files 
493       * for the map-reduce job.
494       */ 
495      public static void setInputPaths(Job job, 
496                                       Path... inputPaths) throws IOException {
497        Configuration conf = job.getConfiguration();
498        Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
499        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
500        for(int i = 1; i < inputPaths.length;i++) {
501          str.append(StringUtils.COMMA_STR);
502          path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
503          str.append(StringUtils.escapeString(path.toString()));
504        }
505        conf.set(INPUT_DIR, str.toString());
506      }
507    
508      /**
509       * Add a {@link Path} to the list of inputs for the map-reduce job.
510       * 
511       * @param job The {@link Job} to modify
512       * @param path {@link Path} to be added to the list of inputs for 
513       *            the map-reduce job.
514       */
515      public static void addInputPath(Job job, 
516                                      Path path) throws IOException {
517        Configuration conf = job.getConfiguration();
518        path = path.getFileSystem(conf).makeQualified(path);
519        String dirStr = StringUtils.escapeString(path.toString());
520        String dirs = conf.get(INPUT_DIR);
521        conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
522      }
523      
524      // This method escapes commas in the glob pattern of the given paths.
525      private static String[] getPathStrings(String commaSeparatedPaths) {
526        int length = commaSeparatedPaths.length();
527        int curlyOpen = 0;
528        int pathStart = 0;
529        boolean globPattern = false;
530        List<String> pathStrings = new ArrayList<String>();
531        
532        for (int i=0; i<length; i++) {
533          char ch = commaSeparatedPaths.charAt(i);
534          switch(ch) {
535            case '{' : {
536              curlyOpen++;
537              if (!globPattern) {
538                globPattern = true;
539              }
540              break;
541            }
542            case '}' : {
543              curlyOpen--;
544              if (curlyOpen == 0 && globPattern) {
545                globPattern = false;
546              }
547              break;
548            }
549            case ',' : {
550              if (!globPattern) {
551                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
552                pathStart = i + 1 ;
553              }
554              break;
555            }
556            default:
557              continue; // nothing special to do for this character
558          }
559        }
560        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
561        
562        return pathStrings.toArray(new String[0]);
563      }
564      
565      /**
566       * Get the list of input {@link Path}s for the map-reduce job.
567       * 
568       * @param context The job
569       * @return the list of input {@link Path}s for the map-reduce job.
570       */
571      public static Path[] getInputPaths(JobContext context) {
572        String dirs = context.getConfiguration().get(INPUT_DIR, "");
573        String [] list = StringUtils.split(dirs);
574        Path[] result = new Path[list.length];
575        for (int i = 0; i < list.length; i++) {
576          result[i] = new Path(StringUtils.unEscapeString(list[i]));
577        }
578        return result;
579      }
580    
581    }