001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.Comparator;
025    import java.util.HashSet;
026    import java.util.IdentityHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.fs.BlockLocation;
037    import org.apache.hadoop.fs.FileStatus;
038    import org.apache.hadoop.fs.FileSystem;
039    import org.apache.hadoop.fs.LocatedFileStatus;
040    import org.apache.hadoop.fs.Path;
041    import org.apache.hadoop.fs.PathFilter;
042    import org.apache.hadoop.fs.RemoteIterator;
043    import org.apache.hadoop.mapreduce.security.TokenCache;
044    import org.apache.hadoop.net.NetworkTopology;
045    import org.apache.hadoop.net.Node;
046    import org.apache.hadoop.net.NodeBase;
047    import org.apache.hadoop.util.ReflectionUtils;
048    import org.apache.hadoop.util.StringUtils;
049    
050    /** 
051     * A base class for file-based {@link InputFormat}.
052     * 
053     * <p><code>FileInputFormat</code> is the base class for all file-based 
054     * <code>InputFormat</code>s. This provides a generic implementation of
055     * {@link #getSplits(JobConf, int)}.
056     * Subclasses of <code>FileInputFormat</code> can also override the 
057     * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
058     * not split-up and are processed as a whole by {@link Mapper}s.
059     */
060    @InterfaceAudience.Public
061    @InterfaceStability.Stable
062    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
063    
064      public static final Log LOG =
065        LogFactory.getLog(FileInputFormat.class);
066      
067      @Deprecated
068      public static enum Counter { 
069        BYTES_READ
070      }
071    
072      public static final String NUM_INPUT_FILES =
073        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
074      
075      public static final String INPUT_DIR_RECURSIVE = 
076        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
077    
078    
079      private static final double SPLIT_SLOP = 1.1;   // 10% slop
080    
081      private long minSplitSize = 1;
082      private static final PathFilter hiddenFileFilter = new PathFilter(){
083          public boolean accept(Path p){
084            String name = p.getName(); 
085            return !name.startsWith("_") && !name.startsWith("."); 
086          }
087        }; 
088      protected void setMinSplitSize(long minSplitSize) {
089        this.minSplitSize = minSplitSize;
090      }
091    
092      /**
093       * Proxy PathFilter that accepts a path only if all filters given in the
094       * constructor do. Used by the listPaths() to apply the built-in
095       * hiddenFileFilter together with a user provided one (if any).
096       */
097      private static class MultiPathFilter implements PathFilter {
098        private List<PathFilter> filters;
099    
100        public MultiPathFilter(List<PathFilter> filters) {
101          this.filters = filters;
102        }
103    
104        public boolean accept(Path path) {
105          for (PathFilter filter : filters) {
106            if (!filter.accept(path)) {
107              return false;
108            }
109          }
110          return true;
111        }
112      }
113    
114      /**
115       * Is the given filename splitable? Usually, true, but if the file is
116       * stream compressed, it will not be.
117       * 
118       * <code>FileInputFormat</code> implementations can override this and return
119       * <code>false</code> to ensure that individual input files are never split-up
120       * so that {@link Mapper}s process entire files.
121       * 
122       * @param fs the file system that the file is on
123       * @param filename the file name to check
124       * @return is this file splitable?
125       */
126      protected boolean isSplitable(FileSystem fs, Path filename) {
127        return true;
128      }
129      
130      public abstract RecordReader<K, V> getRecordReader(InputSplit split,
131                                                   JobConf job,
132                                                   Reporter reporter)
133        throws IOException;
134    
135      /**
136       * Set a PathFilter to be applied to the input paths for the map-reduce job.
137       *
138       * @param filter the PathFilter class use for filtering the input paths.
139       */
140      public static void setInputPathFilter(JobConf conf,
141                                            Class<? extends PathFilter> filter) {
142        conf.setClass(org.apache.hadoop.mapreduce.lib.input.
143          FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
144      }
145    
146      /**
147       * Get a PathFilter instance of the filter set for the input paths.
148       *
149       * @return the PathFilter instance set for the job, NULL if none has been set.
150       */
151      public static PathFilter getInputPathFilter(JobConf conf) {
152        Class<? extends PathFilter> filterClass = conf.getClass(
153              org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
154              null, PathFilter.class);
155        return (filterClass != null) ?
156            ReflectionUtils.newInstance(filterClass, conf) : null;
157      }
158    
159      /**
160       * Add files in the input path recursively into the results.
161       * @param result
162       *          The List to store all files.
163       * @param fs
164       *          The FileSystem.
165       * @param path
166       *          The input path.
167       * @param inputFilter
168       *          The input filter that can be used to filter files/dirs. 
169       * @throws IOException
170       */
171      protected void addInputPathRecursively(List<FileStatus> result,
172          FileSystem fs, Path path, PathFilter inputFilter) 
173          throws IOException {
174        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
175        while (iter.hasNext()) {
176          LocatedFileStatus stat = iter.next();
177          if (inputFilter.accept(stat.getPath())) {
178            if (stat.isDirectory()) {
179              addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
180            } else {
181              result.add(stat);
182            }
183          }
184        }
185      }
186      
187      /** List input directories.
188       * Subclasses may override to, e.g., select only files matching a regular
189       * expression. 
190       * 
191       * @param job the job to list input paths for
192       * @return array of FileStatus objects
193       * @throws IOException if zero items.
194       */
195      protected FileStatus[] listStatus(JobConf job) throws IOException {
196        Path[] dirs = getInputPaths(job);
197        if (dirs.length == 0) {
198          throw new IOException("No input paths specified in job");
199        }
200    
201        // get tokens for all the required FileSystems..
202        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
203        
204        // Whether we need to recursive look into the directory structure
205        boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
206        
207        List<FileStatus> result = new ArrayList<FileStatus>();
208        List<IOException> errors = new ArrayList<IOException>();
209        
210        // creates a MultiPathFilter with the hiddenFileFilter and the
211        // user provided one (if any).
212        List<PathFilter> filters = new ArrayList<PathFilter>();
213        filters.add(hiddenFileFilter);
214        PathFilter jobFilter = getInputPathFilter(job);
215        if (jobFilter != null) {
216          filters.add(jobFilter);
217        }
218        PathFilter inputFilter = new MultiPathFilter(filters);
219    
220        for (Path p: dirs) {
221          FileSystem fs = p.getFileSystem(job); 
222          FileStatus[] matches = fs.globStatus(p, inputFilter);
223          if (matches == null) {
224            errors.add(new IOException("Input path does not exist: " + p));
225          } else if (matches.length == 0) {
226            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
227          } else {
228            for (FileStatus globStat: matches) {
229              if (globStat.isDirectory()) {
230                RemoteIterator<LocatedFileStatus> iter =
231                    fs.listLocatedStatus(globStat.getPath());
232                while (iter.hasNext()) {
233                  LocatedFileStatus stat = iter.next();
234                  if (inputFilter.accept(stat.getPath())) {
235                    if (recursive && stat.isDirectory()) {
236                      addInputPathRecursively(result, fs, stat.getPath(),
237                          inputFilter);
238                    } else {
239                      result.add(stat);
240                    }
241                  }
242                }
243              } else {
244                result.add(globStat);
245              }
246            }
247          }
248        }
249    
250        if (!errors.isEmpty()) {
251          throw new InvalidInputException(errors);
252        }
253        LOG.info("Total input paths to process : " + result.size()); 
254        return result.toArray(new FileStatus[result.size()]);
255      }
256    
257      /**
258       * A factory that makes the split for this class. It can be overridden
259       * by sub-classes to make sub-types
260       */
261      protected FileSplit makeSplit(Path file, long start, long length, 
262                                    String[] hosts) {
263        return new FileSplit(file, start, length, hosts);
264      }
265    
266      /** Splits files returned by {@link #listStatus(JobConf)} when
267       * they're too big.*/ 
268      public InputSplit[] getSplits(JobConf job, int numSplits)
269        throws IOException {
270        FileStatus[] files = listStatus(job);
271        
272        // Save the number of input files for metrics/loadgen
273        job.setLong(NUM_INPUT_FILES, files.length);
274        long totalSize = 0;                           // compute total size
275        for (FileStatus file: files) {                // check we have valid files
276          if (file.isDirectory()) {
277            throw new IOException("Not a file: "+ file.getPath());
278          }
279          totalSize += file.getLen();
280        }
281    
282        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
283        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
284          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
285    
286        // generate splits
287        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
288        NetworkTopology clusterMap = new NetworkTopology();
289        for (FileStatus file: files) {
290          Path path = file.getPath();
291          long length = file.getLen();
292          if (length != 0) {
293            FileSystem fs = path.getFileSystem(job);
294            BlockLocation[] blkLocations;
295            if (file instanceof LocatedFileStatus) {
296              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
297            } else {
298              blkLocations = fs.getFileBlockLocations(file, 0, length);
299            }
300            if (isSplitable(fs, path)) {
301              long blockSize = file.getBlockSize();
302              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
303    
304              long bytesRemaining = length;
305              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
306                String[] splitHosts = getSplitHosts(blkLocations,
307                    length-bytesRemaining, splitSize, clusterMap);
308                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
309                    splitHosts));
310                bytesRemaining -= splitSize;
311              }
312    
313              if (bytesRemaining != 0) {
314                String[] splitHosts = getSplitHosts(blkLocations, length
315                    - bytesRemaining, bytesRemaining, clusterMap);
316                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
317                    splitHosts));
318              }
319            } else {
320              String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
321              splits.add(makeSplit(path, 0, length, splitHosts));
322            }
323          } else { 
324            //Create empty hosts array for zero length files
325            splits.add(makeSplit(path, 0, length, new String[0]));
326          }
327        }
328        LOG.debug("Total # of splits: " + splits.size());
329        return splits.toArray(new FileSplit[splits.size()]);
330      }
331    
332      protected long computeSplitSize(long goalSize, long minSize,
333                                           long blockSize) {
334        return Math.max(minSize, Math.min(goalSize, blockSize));
335      }
336    
337      protected int getBlockIndex(BlockLocation[] blkLocations, 
338                                  long offset) {
339        for (int i = 0 ; i < blkLocations.length; i++) {
340          // is the offset inside this block?
341          if ((blkLocations[i].getOffset() <= offset) &&
342              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
343            return i;
344          }
345        }
346        BlockLocation last = blkLocations[blkLocations.length -1];
347        long fileLength = last.getOffset() + last.getLength() -1;
348        throw new IllegalArgumentException("Offset " + offset + 
349                                           " is outside of file (0.." +
350                                           fileLength + ")");
351      }
352    
353      /**
354       * Sets the given comma separated paths as the list of inputs 
355       * for the map-reduce job.
356       * 
357       * @param conf Configuration of the job
358       * @param commaSeparatedPaths Comma separated paths to be set as 
359       *        the list of inputs for the map-reduce job.
360       */
361      public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
362        setInputPaths(conf, StringUtils.stringToPath(
363                            getPathStrings(commaSeparatedPaths)));
364      }
365    
366      /**
367       * Add the given comma separated paths to the list of inputs for
368       *  the map-reduce job.
369       * 
370       * @param conf The configuration of the job 
371       * @param commaSeparatedPaths Comma separated paths to be added to
372       *        the list of inputs for the map-reduce job.
373       */
374      public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
375        for (String str : getPathStrings(commaSeparatedPaths)) {
376          addInputPath(conf, new Path(str));
377        }
378      }
379    
380      /**
381       * Set the array of {@link Path}s as the list of inputs
382       * for the map-reduce job.
383       * 
384       * @param conf Configuration of the job. 
385       * @param inputPaths the {@link Path}s of the input directories/files 
386       * for the map-reduce job.
387       */ 
388      public static void setInputPaths(JobConf conf, Path... inputPaths) {
389        Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
390        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
391        for(int i = 1; i < inputPaths.length;i++) {
392          str.append(StringUtils.COMMA_STR);
393          path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
394          str.append(StringUtils.escapeString(path.toString()));
395        }
396        conf.set(org.apache.hadoop.mapreduce.lib.input.
397          FileInputFormat.INPUT_DIR, str.toString());
398      }
399    
400      /**
401       * Add a {@link Path} to the list of inputs for the map-reduce job.
402       * 
403       * @param conf The configuration of the job 
404       * @param path {@link Path} to be added to the list of inputs for 
405       *            the map-reduce job.
406       */
407      public static void addInputPath(JobConf conf, Path path ) {
408        path = new Path(conf.getWorkingDirectory(), path);
409        String dirStr = StringUtils.escapeString(path.toString());
410        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
411          FileInputFormat.INPUT_DIR);
412        conf.set(org.apache.hadoop.mapreduce.lib.input.
413          FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
414          dirs + StringUtils.COMMA_STR + dirStr);
415      }
416             
417      // This method escapes commas in the glob pattern of the given paths.
418      private static String[] getPathStrings(String commaSeparatedPaths) {
419        int length = commaSeparatedPaths.length();
420        int curlyOpen = 0;
421        int pathStart = 0;
422        boolean globPattern = false;
423        List<String> pathStrings = new ArrayList<String>();
424        
425        for (int i=0; i<length; i++) {
426          char ch = commaSeparatedPaths.charAt(i);
427          switch(ch) {
428            case '{' : {
429              curlyOpen++;
430              if (!globPattern) {
431                globPattern = true;
432              }
433              break;
434            }
435            case '}' : {
436              curlyOpen--;
437              if (curlyOpen == 0 && globPattern) {
438                globPattern = false;
439              }
440              break;
441            }
442            case ',' : {
443              if (!globPattern) {
444                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
445                pathStart = i + 1 ;
446              }
447              break;
448            }
449            default:
450              continue; // nothing special to do for this character
451          }
452        }
453        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
454        
455        return pathStrings.toArray(new String[0]);
456      }
457      
458      /**
459       * Get the list of input {@link Path}s for the map-reduce job.
460       * 
461       * @param conf The configuration of the job 
462       * @return the list of input {@link Path}s for the map-reduce job.
463       */
464      public static Path[] getInputPaths(JobConf conf) {
465        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
466          FileInputFormat.INPUT_DIR, "");
467        String [] list = StringUtils.split(dirs);
468        Path[] result = new Path[list.length];
469        for (int i = 0; i < list.length; i++) {
470          result[i] = new Path(StringUtils.unEscapeString(list[i]));
471        }
472        return result;
473      }
474      
475    
476      private void sortInDescendingOrder(List<NodeInfo> mylist) {
477        Collections.sort(mylist, new Comparator<NodeInfo> () {
478          public int compare(NodeInfo obj1, NodeInfo obj2) {
479    
480            if (obj1 == null || obj2 == null)
481              return -1;
482    
483            if (obj1.getValue() == obj2.getValue()) {
484              return 0;
485            }
486            else {
487              return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
488            }
489          }
490        }
491        );
492      }
493    
494      /** 
495       * This function identifies and returns the hosts that contribute 
496       * most for a given split. For calculating the contribution, rack
497       * locality is treated on par with host locality, so hosts from racks
498       * that contribute the most are preferred over hosts on racks that 
499       * contribute less
500       * @param blkLocations The list of block locations
501       * @param offset 
502       * @param splitSize 
503       * @return array of hosts that contribute most to this split
504       * @throws IOException
505       */
506      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
507          long offset, long splitSize, NetworkTopology clusterMap)
508      throws IOException {
509    
510        int startIndex = getBlockIndex(blkLocations, offset);
511    
512        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
513                              blkLocations[startIndex].getLength() - offset;
514    
515        //If this is the only block, just return
516        if (bytesInThisBlock >= splitSize) {
517          return blkLocations[startIndex].getHosts();
518        }
519    
520        long bytesInFirstBlock = bytesInThisBlock;
521        int index = startIndex + 1;
522        splitSize -= bytesInThisBlock;
523    
524        while (splitSize > 0) {
525          bytesInThisBlock =
526            Math.min(splitSize, blkLocations[index++].getLength());
527          splitSize -= bytesInThisBlock;
528        }
529    
530        long bytesInLastBlock = bytesInThisBlock;
531        int endIndex = index - 1;
532        
533        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
534        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
535        String [] allTopos = new String[0];
536    
537        // Build the hierarchy and aggregate the contribution of 
538        // bytes at each level. See TestGetSplitHosts.java 
539    
540        for (index = startIndex; index <= endIndex; index++) {
541    
542          // Establish the bytes in this block
543          if (index == startIndex) {
544            bytesInThisBlock = bytesInFirstBlock;
545          }
546          else if (index == endIndex) {
547            bytesInThisBlock = bytesInLastBlock;
548          }
549          else {
550            bytesInThisBlock = blkLocations[index].getLength();
551          }
552          
553          allTopos = blkLocations[index].getTopologyPaths();
554    
555          // If no topology information is available, just
556          // prefix a fakeRack
557          if (allTopos.length == 0) {
558            allTopos = fakeRacks(blkLocations, index);
559          }
560    
561          // NOTE: This code currently works only for one level of
562          // hierarchy (rack/host). However, it is relatively easy
563          // to extend this to support aggregation at different
564          // levels 
565          
566          for (String topo: allTopos) {
567    
568            Node node, parentNode;
569            NodeInfo nodeInfo, parentNodeInfo;
570    
571            node = clusterMap.getNode(topo);
572    
573            if (node == null) {
574              node = new NodeBase(topo);
575              clusterMap.add(node);
576            }
577            
578            nodeInfo = hostsMap.get(node);
579            
580            if (nodeInfo == null) {
581              nodeInfo = new NodeInfo(node);
582              hostsMap.put(node,nodeInfo);
583              parentNode = node.getParent();
584              parentNodeInfo = racksMap.get(parentNode);
585              if (parentNodeInfo == null) {
586                parentNodeInfo = new NodeInfo(parentNode);
587                racksMap.put(parentNode,parentNodeInfo);
588              }
589              parentNodeInfo.addLeaf(nodeInfo);
590            }
591            else {
592              nodeInfo = hostsMap.get(node);
593              parentNode = node.getParent();
594              parentNodeInfo = racksMap.get(parentNode);
595            }
596    
597            nodeInfo.addValue(index, bytesInThisBlock);
598            parentNodeInfo.addValue(index, bytesInThisBlock);
599    
600          } // for all topos
601        
602        } // for all indices
603    
604        return identifyHosts(allTopos.length, racksMap);
605      }
606      
607      private String[] identifyHosts(int replicationFactor, 
608                                     Map<Node,NodeInfo> racksMap) {
609        
610        String [] retVal = new String[replicationFactor];
611       
612        List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
613    
614        rackList.addAll(racksMap.values());
615        
616        // Sort the racks based on their contribution to this split
617        sortInDescendingOrder(rackList);
618        
619        boolean done = false;
620        int index = 0;
621        
622        // Get the host list for all our aggregated items, sort
623        // them and return the top entries
624        for (NodeInfo ni: rackList) {
625    
626          Set<NodeInfo> hostSet = ni.getLeaves();
627    
628          List<NodeInfo>hostList = new LinkedList<NodeInfo>();
629          hostList.addAll(hostSet);
630        
631          // Sort the hosts in this rack based on their contribution
632          sortInDescendingOrder(hostList);
633    
634          for (NodeInfo host: hostList) {
635            // Strip out the port number from the host name
636            retVal[index++] = host.node.getName().split(":")[0];
637            if (index == replicationFactor) {
638              done = true;
639              break;
640            }
641          }
642          
643          if (done == true) {
644            break;
645          }
646        }
647        return retVal;
648      }
649      
650      private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
651      throws IOException {
652        String[] allHosts = blkLocations[index].getHosts();
653        String[] allTopos = new String[allHosts.length];
654        for (int i = 0; i < allHosts.length; i++) {
655          allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
656        }
657        return allTopos;
658      }
659    
660    
661      private static class NodeInfo {
662        final Node node;
663        final Set<Integer> blockIds;
664        final Set<NodeInfo> leaves;
665    
666        private long value;
667        
668        NodeInfo(Node node) {
669          this.node = node;
670          blockIds = new HashSet<Integer>();
671          leaves = new HashSet<NodeInfo>();
672        }
673    
674        long getValue() {return value;}
675    
676        void addValue(int blockIndex, long value) {
677          if (blockIds.add(blockIndex) == true) {
678            this.value += value;
679          }
680        }
681    
682        Set<NodeInfo> getLeaves() { return leaves;}
683    
684        void addLeaf(NodeInfo nodeInfo) {
685          leaves.add(nodeInfo);
686        }
687      }
688    }