001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.Comparator;
025    import java.util.HashSet;
026    import java.util.IdentityHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.fs.BlockLocation;
037    import org.apache.hadoop.fs.FileStatus;
038    import org.apache.hadoop.fs.FileSystem;
039    import org.apache.hadoop.fs.Path;
040    import org.apache.hadoop.fs.PathFilter;
041    import org.apache.hadoop.mapreduce.security.TokenCache;
042    import org.apache.hadoop.net.NetworkTopology;
043    import org.apache.hadoop.net.Node;
044    import org.apache.hadoop.net.NodeBase;
045    import org.apache.hadoop.util.ReflectionUtils;
046    import org.apache.hadoop.util.StringUtils;
047    
048    /** 
049     * A base class for file-based {@link InputFormat}.
050     * 
051     * <p><code>FileInputFormat</code> is the base class for all file-based 
052     * <code>InputFormat</code>s. This provides a generic implementation of
053     * {@link #getSplits(JobConf, int)}.
054     * Subclasses of <code>FileInputFormat</code> can also override the 
055     * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
056     * not split-up and are processed as a whole by {@link Mapper}s.
057     */
058    @InterfaceAudience.Public
059    @InterfaceStability.Stable
060    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
061    
062      public static final Log LOG =
063        LogFactory.getLog(FileInputFormat.class);
064    
065      public static final String NUM_INPUT_FILES =
066        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
067    
068      private static final double SPLIT_SLOP = 1.1;   // 10% slop
069    
070      private long minSplitSize = 1;
071      private static final PathFilter hiddenFileFilter = new PathFilter(){
072          public boolean accept(Path p){
073            String name = p.getName(); 
074            return !name.startsWith("_") && !name.startsWith("."); 
075          }
076        }; 
077      protected void setMinSplitSize(long minSplitSize) {
078        this.minSplitSize = minSplitSize;
079      }
080    
081      /**
082       * Proxy PathFilter that accepts a path only if all filters given in the
083       * constructor do. Used by the listPaths() to apply the built-in
084       * hiddenFileFilter together with a user provided one (if any).
085       */
086      private static class MultiPathFilter implements PathFilter {
087        private List<PathFilter> filters;
088    
089        public MultiPathFilter(List<PathFilter> filters) {
090          this.filters = filters;
091        }
092    
093        public boolean accept(Path path) {
094          for (PathFilter filter : filters) {
095            if (!filter.accept(path)) {
096              return false;
097            }
098          }
099          return true;
100        }
101      }
102    
103      /**
104       * Is the given filename splitable? Usually, true, but if the file is
105       * stream compressed, it will not be.
106       * 
107       * <code>FileInputFormat</code> implementations can override this and return
108       * <code>false</code> to ensure that individual input files are never split-up
109       * so that {@link Mapper}s process entire files.
110       * 
111       * @param fs the file system that the file is on
112       * @param filename the file name to check
113       * @return is this file splitable?
114       */
115      protected boolean isSplitable(FileSystem fs, Path filename) {
116        return true;
117      }
118      
119      public abstract RecordReader<K, V> getRecordReader(InputSplit split,
120                                                   JobConf job,
121                                                   Reporter reporter)
122        throws IOException;
123    
124      /**
125       * Set a PathFilter to be applied to the input paths for the map-reduce job.
126       *
127       * @param filter the PathFilter class use for filtering the input paths.
128       */
129      public static void setInputPathFilter(JobConf conf,
130                                            Class<? extends PathFilter> filter) {
131        conf.setClass(org.apache.hadoop.mapreduce.lib.input.
132          FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
133      }
134    
135      /**
136       * Get a PathFilter instance of the filter set for the input paths.
137       *
138       * @return the PathFilter instance set for the job, NULL if none has been set.
139       */
140      public static PathFilter getInputPathFilter(JobConf conf) {
141        Class<? extends PathFilter> filterClass = conf.getClass(
142              org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
143              null, PathFilter.class);
144        return (filterClass != null) ?
145            ReflectionUtils.newInstance(filterClass, conf) : null;
146      }
147    
148      /**
149       * Add files in the input path recursively into the results.
150       * @param result
151       *          The List to store all files.
152       * @param fs
153       *          The FileSystem.
154       * @param path
155       *          The input path.
156       * @param inputFilter
157       *          The input filter that can be used to filter files/dirs. 
158       * @throws IOException
159       */
160      protected void addInputPathRecursively(List<FileStatus> result,
161          FileSystem fs, Path path, PathFilter inputFilter) 
162          throws IOException {
163        for(FileStatus stat: fs.listStatus(path, inputFilter)) {
164          if (stat.isDirectory()) {
165            addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
166          } else {
167            result.add(stat);
168          }
169        }          
170      }
171      
172      /** List input directories.
173       * Subclasses may override to, e.g., select only files matching a regular
174       * expression. 
175       * 
176       * @param job the job to list input paths for
177       * @return array of FileStatus objects
178       * @throws IOException if zero items.
179       */
180      protected FileStatus[] listStatus(JobConf job) throws IOException {
181        Path[] dirs = getInputPaths(job);
182        if (dirs.length == 0) {
183          throw new IOException("No input paths specified in job");
184        }
185    
186        // get tokens for all the required FileSystems..
187        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
188        
189        // Whether we need to recursive look into the directory structure
190        boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
191        
192        List<FileStatus> result = new ArrayList<FileStatus>();
193        List<IOException> errors = new ArrayList<IOException>();
194        
195        // creates a MultiPathFilter with the hiddenFileFilter and the
196        // user provided one (if any).
197        List<PathFilter> filters = new ArrayList<PathFilter>();
198        filters.add(hiddenFileFilter);
199        PathFilter jobFilter = getInputPathFilter(job);
200        if (jobFilter != null) {
201          filters.add(jobFilter);
202        }
203        PathFilter inputFilter = new MultiPathFilter(filters);
204    
205        for (Path p: dirs) {
206          FileSystem fs = p.getFileSystem(job); 
207          FileStatus[] matches = fs.globStatus(p, inputFilter);
208          if (matches == null) {
209            errors.add(new IOException("Input path does not exist: " + p));
210          } else if (matches.length == 0) {
211            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
212          } else {
213            for (FileStatus globStat: matches) {
214              if (globStat.isDirectory()) {
215                for(FileStatus stat: fs.listStatus(globStat.getPath(),
216                    inputFilter)) {
217                  if (recursive && stat.isDirectory()) {
218                    addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
219                  } else {
220                    result.add(stat);
221                  }
222                }          
223              } else {
224                result.add(globStat);
225              }
226            }
227          }
228        }
229    
230        if (!errors.isEmpty()) {
231          throw new InvalidInputException(errors);
232        }
233        LOG.info("Total input paths to process : " + result.size()); 
234        return result.toArray(new FileStatus[result.size()]);
235      }
236    
237      /**
238       * A factory that makes the split for this class. It can be overridden
239       * by sub-classes to make sub-types
240       */
241      protected FileSplit makeSplit(Path file, long start, long length, 
242                                    String[] hosts) {
243        return new FileSplit(file, start, length, hosts);
244      }
245    
246      /** Splits files returned by {@link #listStatus(JobConf)} when
247       * they're too big.*/ 
248      @SuppressWarnings("deprecation")
249      public InputSplit[] getSplits(JobConf job, int numSplits)
250        throws IOException {
251        FileStatus[] files = listStatus(job);
252        
253        // Save the number of input files for metrics/loadgen
254        job.setLong(NUM_INPUT_FILES, files.length);
255        long totalSize = 0;                           // compute total size
256        for (FileStatus file: files) {                // check we have valid files
257          if (file.isDirectory()) {
258            throw new IOException("Not a file: "+ file.getPath());
259          }
260          totalSize += file.getLen();
261        }
262    
263        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
264        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
265          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
266    
267        // generate splits
268        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
269        NetworkTopology clusterMap = new NetworkTopology();
270        for (FileStatus file: files) {
271          Path path = file.getPath();
272          FileSystem fs = path.getFileSystem(job);
273          long length = file.getLen();
274          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
275          if ((length != 0) && isSplitable(fs, path)) { 
276            long blockSize = file.getBlockSize();
277            long splitSize = computeSplitSize(goalSize, minSize, blockSize);
278    
279            long bytesRemaining = length;
280            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
281              String[] splitHosts = getSplitHosts(blkLocations, 
282                  length-bytesRemaining, splitSize, clusterMap);
283              splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
284                                   splitHosts));
285              bytesRemaining -= splitSize;
286            }
287            
288            if (bytesRemaining != 0) {
289              String[] splitHosts = getSplitHosts(blkLocations, length
290                  - bytesRemaining, bytesRemaining, clusterMap);
291              splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
292                  splitHosts));
293            }
294          } else if (length != 0) {
295            String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
296            splits.add(makeSplit(path, 0, length, splitHosts));
297          } else { 
298            //Create empty hosts array for zero length files
299            splits.add(makeSplit(path, 0, length, new String[0]));
300          }
301        }
302        LOG.debug("Total # of splits: " + splits.size());
303        return splits.toArray(new FileSplit[splits.size()]);
304      }
305    
306      protected long computeSplitSize(long goalSize, long minSize,
307                                           long blockSize) {
308        return Math.max(minSize, Math.min(goalSize, blockSize));
309      }
310    
311      protected int getBlockIndex(BlockLocation[] blkLocations, 
312                                  long offset) {
313        for (int i = 0 ; i < blkLocations.length; i++) {
314          // is the offset inside this block?
315          if ((blkLocations[i].getOffset() <= offset) &&
316              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
317            return i;
318          }
319        }
320        BlockLocation last = blkLocations[blkLocations.length -1];
321        long fileLength = last.getOffset() + last.getLength() -1;
322        throw new IllegalArgumentException("Offset " + offset + 
323                                           " is outside of file (0.." +
324                                           fileLength + ")");
325      }
326    
327      /**
328       * Sets the given comma separated paths as the list of inputs 
329       * for the map-reduce job.
330       * 
331       * @param conf Configuration of the job
332       * @param commaSeparatedPaths Comma separated paths to be set as 
333       *        the list of inputs for the map-reduce job.
334       */
335      public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
336        setInputPaths(conf, StringUtils.stringToPath(
337                            getPathStrings(commaSeparatedPaths)));
338      }
339    
340      /**
341       * Add the given comma separated paths to the list of inputs for
342       *  the map-reduce job.
343       * 
344       * @param conf The configuration of the job 
345       * @param commaSeparatedPaths Comma separated paths to be added to
346       *        the list of inputs for the map-reduce job.
347       */
348      public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
349        for (String str : getPathStrings(commaSeparatedPaths)) {
350          addInputPath(conf, new Path(str));
351        }
352      }
353    
354      /**
355       * Set the array of {@link Path}s as the list of inputs
356       * for the map-reduce job.
357       * 
358       * @param conf Configuration of the job. 
359       * @param inputPaths the {@link Path}s of the input directories/files 
360       * for the map-reduce job.
361       */ 
362      public static void setInputPaths(JobConf conf, Path... inputPaths) {
363        Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
364        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
365        for(int i = 1; i < inputPaths.length;i++) {
366          str.append(StringUtils.COMMA_STR);
367          path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
368          str.append(StringUtils.escapeString(path.toString()));
369        }
370        conf.set(org.apache.hadoop.mapreduce.lib.input.
371          FileInputFormat.INPUT_DIR, str.toString());
372      }
373    
374      /**
375       * Add a {@link Path} to the list of inputs for the map-reduce job.
376       * 
377       * @param conf The configuration of the job 
378       * @param path {@link Path} to be added to the list of inputs for 
379       *            the map-reduce job.
380       */
381      public static void addInputPath(JobConf conf, Path path ) {
382        path = new Path(conf.getWorkingDirectory(), path);
383        String dirStr = StringUtils.escapeString(path.toString());
384        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
385          FileInputFormat.INPUT_DIR);
386        conf.set(org.apache.hadoop.mapreduce.lib.input.
387          FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
388          dirs + StringUtils.COMMA_STR + dirStr);
389      }
390             
391      // This method escapes commas in the glob pattern of the given paths.
392      private static String[] getPathStrings(String commaSeparatedPaths) {
393        int length = commaSeparatedPaths.length();
394        int curlyOpen = 0;
395        int pathStart = 0;
396        boolean globPattern = false;
397        List<String> pathStrings = new ArrayList<String>();
398        
399        for (int i=0; i<length; i++) {
400          char ch = commaSeparatedPaths.charAt(i);
401          switch(ch) {
402            case '{' : {
403              curlyOpen++;
404              if (!globPattern) {
405                globPattern = true;
406              }
407              break;
408            }
409            case '}' : {
410              curlyOpen--;
411              if (curlyOpen == 0 && globPattern) {
412                globPattern = false;
413              }
414              break;
415            }
416            case ',' : {
417              if (!globPattern) {
418                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
419                pathStart = i + 1 ;
420              }
421              break;
422            }
423          }
424        }
425        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
426        
427        return pathStrings.toArray(new String[0]);
428      }
429      
430      /**
431       * Get the list of input {@link Path}s for the map-reduce job.
432       * 
433       * @param conf The configuration of the job 
434       * @return the list of input {@link Path}s for the map-reduce job.
435       */
436      public static Path[] getInputPaths(JobConf conf) {
437        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
438          FileInputFormat.INPUT_DIR, "");
439        String [] list = StringUtils.split(dirs);
440        Path[] result = new Path[list.length];
441        for (int i = 0; i < list.length; i++) {
442          result[i] = new Path(StringUtils.unEscapeString(list[i]));
443        }
444        return result;
445      }
446      
447    
448      private void sortInDescendingOrder(List<NodeInfo> mylist) {
449        Collections.sort(mylist, new Comparator<NodeInfo> () {
450          public int compare(NodeInfo obj1, NodeInfo obj2) {
451    
452            if (obj1 == null || obj2 == null)
453              return -1;
454    
455            if (obj1.getValue() == obj2.getValue()) {
456              return 0;
457            }
458            else {
459              return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
460            }
461          }
462        }
463        );
464      }
465    
466      /** 
467       * This function identifies and returns the hosts that contribute 
468       * most for a given split. For calculating the contribution, rack
469       * locality is treated on par with host locality, so hosts from racks
470       * that contribute the most are preferred over hosts on racks that 
471       * contribute less
472       * @param blkLocations The list of block locations
473       * @param offset 
474       * @param splitSize 
475       * @return array of hosts that contribute most to this split
476       * @throws IOException
477       */
478      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
479          long offset, long splitSize, NetworkTopology clusterMap)
480      throws IOException {
481    
482        int startIndex = getBlockIndex(blkLocations, offset);
483    
484        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
485                              blkLocations[startIndex].getLength() - offset;
486    
487        //If this is the only block, just return
488        if (bytesInThisBlock >= splitSize) {
489          return blkLocations[startIndex].getHosts();
490        }
491    
492        long bytesInFirstBlock = bytesInThisBlock;
493        int index = startIndex + 1;
494        splitSize -= bytesInThisBlock;
495    
496        while (splitSize > 0) {
497          bytesInThisBlock =
498            Math.min(splitSize, blkLocations[index++].getLength());
499          splitSize -= bytesInThisBlock;
500        }
501    
502        long bytesInLastBlock = bytesInThisBlock;
503        int endIndex = index - 1;
504        
505        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
506        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
507        String [] allTopos = new String[0];
508    
509        // Build the hierarchy and aggregate the contribution of 
510        // bytes at each level. See TestGetSplitHosts.java 
511    
512        for (index = startIndex; index <= endIndex; index++) {
513    
514          // Establish the bytes in this block
515          if (index == startIndex) {
516            bytesInThisBlock = bytesInFirstBlock;
517          }
518          else if (index == endIndex) {
519            bytesInThisBlock = bytesInLastBlock;
520          }
521          else {
522            bytesInThisBlock = blkLocations[index].getLength();
523          }
524          
525          allTopos = blkLocations[index].getTopologyPaths();
526    
527          // If no topology information is available, just
528          // prefix a fakeRack
529          if (allTopos.length == 0) {
530            allTopos = fakeRacks(blkLocations, index);
531          }
532    
533          // NOTE: This code currently works only for one level of
534          // hierarchy (rack/host). However, it is relatively easy
535          // to extend this to support aggregation at different
536          // levels 
537          
538          for (String topo: allTopos) {
539    
540            Node node, parentNode;
541            NodeInfo nodeInfo, parentNodeInfo;
542    
543            node = clusterMap.getNode(topo);
544    
545            if (node == null) {
546              node = new NodeBase(topo);
547              clusterMap.add(node);
548            }
549            
550            nodeInfo = hostsMap.get(node);
551            
552            if (nodeInfo == null) {
553              nodeInfo = new NodeInfo(node);
554              hostsMap.put(node,nodeInfo);
555              parentNode = node.getParent();
556              parentNodeInfo = racksMap.get(parentNode);
557              if (parentNodeInfo == null) {
558                parentNodeInfo = new NodeInfo(parentNode);
559                racksMap.put(parentNode,parentNodeInfo);
560              }
561              parentNodeInfo.addLeaf(nodeInfo);
562            }
563            else {
564              nodeInfo = hostsMap.get(node);
565              parentNode = node.getParent();
566              parentNodeInfo = racksMap.get(parentNode);
567            }
568    
569            nodeInfo.addValue(index, bytesInThisBlock);
570            parentNodeInfo.addValue(index, bytesInThisBlock);
571    
572          } // for all topos
573        
574        } // for all indices
575    
576        return identifyHosts(allTopos.length, racksMap);
577      }
578      
579      private String[] identifyHosts(int replicationFactor, 
580                                     Map<Node,NodeInfo> racksMap) {
581        
582        String [] retVal = new String[replicationFactor];
583       
584        List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
585    
586        rackList.addAll(racksMap.values());
587        
588        // Sort the racks based on their contribution to this split
589        sortInDescendingOrder(rackList);
590        
591        boolean done = false;
592        int index = 0;
593        
594        // Get the host list for all our aggregated items, sort
595        // them and return the top entries
596        for (NodeInfo ni: rackList) {
597    
598          Set<NodeInfo> hostSet = ni.getLeaves();
599    
600          List<NodeInfo>hostList = new LinkedList<NodeInfo>();
601          hostList.addAll(hostSet);
602        
603          // Sort the hosts in this rack based on their contribution
604          sortInDescendingOrder(hostList);
605    
606          for (NodeInfo host: hostList) {
607            // Strip out the port number from the host name
608            retVal[index++] = host.node.getName().split(":")[0];
609            if (index == replicationFactor) {
610              done = true;
611              break;
612            }
613          }
614          
615          if (done == true) {
616            break;
617          }
618        }
619        return retVal;
620      }
621      
622      private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
623      throws IOException {
624        String[] allHosts = blkLocations[index].getHosts();
625        String[] allTopos = new String[allHosts.length];
626        for (int i = 0; i < allHosts.length; i++) {
627          allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
628        }
629        return allTopos;
630      }
631    
632    
633      private static class NodeInfo {
634        final Node node;
635        final Set<Integer> blockIds;
636        final Set<NodeInfo> leaves;
637    
638        private long value;
639        
640        NodeInfo(Node node) {
641          this.node = node;
642          blockIds = new HashSet<Integer>();
643          leaves = new HashSet<NodeInfo>();
644        }
645    
646        long getValue() {return value;}
647    
648        void addValue(int blockIndex, long value) {
649          if (blockIds.add(blockIndex) == true) {
650            this.value += value;
651          }
652        }
653    
654        Set<NodeInfo> getLeaves() { return leaves;}
655    
656        void addLeaf(NodeInfo nodeInfo) {
657          leaves.add(nodeInfo);
658        }
659      }
660    }