001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.Comparator;
025    import java.util.HashSet;
026    import java.util.IdentityHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.fs.BlockLocation;
037    import org.apache.hadoop.fs.FileStatus;
038    import org.apache.hadoop.fs.FileSystem;
039    import org.apache.hadoop.fs.LocatedFileStatus;
040    import org.apache.hadoop.fs.Path;
041    import org.apache.hadoop.fs.PathFilter;
042    import org.apache.hadoop.fs.RemoteIterator;
043    import org.apache.hadoop.mapreduce.security.TokenCache;
044    import org.apache.hadoop.net.NetworkTopology;
045    import org.apache.hadoop.net.Node;
046    import org.apache.hadoop.net.NodeBase;
047    import org.apache.hadoop.util.ReflectionUtils;
048    import org.apache.hadoop.util.StringUtils;
049    
050    import com.google.common.base.Stopwatch;
051    import com.google.common.collect.Iterables;
052    
053    /** 
054     * A base class for file-based {@link InputFormat}.
055     * 
056     * <p><code>FileInputFormat</code> is the base class for all file-based 
057     * <code>InputFormat</code>s. This provides a generic implementation of
058     * {@link #getSplits(JobConf, int)}.
059     * Subclasses of <code>FileInputFormat</code> can also override the 
060     * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
061     * not split-up and are processed as a whole by {@link Mapper}s.
062     */
063    @InterfaceAudience.Public
064    @InterfaceStability.Stable
065    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
066    
067      public static final Log LOG =
068        LogFactory.getLog(FileInputFormat.class);
069      
070      @Deprecated
071      public static enum Counter { 
072        BYTES_READ
073      }
074    
075      public static final String NUM_INPUT_FILES =
076        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
077      
078      public static final String INPUT_DIR_RECURSIVE = 
079        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
080    
081    
082      private static final double SPLIT_SLOP = 1.1;   // 10% slop
083    
084      private long minSplitSize = 1;
085      private static final PathFilter hiddenFileFilter = new PathFilter(){
086          public boolean accept(Path p){
087            String name = p.getName(); 
088            return !name.startsWith("_") && !name.startsWith("."); 
089          }
090        }; 
091      protected void setMinSplitSize(long minSplitSize) {
092        this.minSplitSize = minSplitSize;
093      }
094    
095      /**
096       * Proxy PathFilter that accepts a path only if all filters given in the
097       * constructor do. Used by the listPaths() to apply the built-in
098       * hiddenFileFilter together with a user provided one (if any).
099       */
100      private static class MultiPathFilter implements PathFilter {
101        private List<PathFilter> filters;
102    
103        public MultiPathFilter(List<PathFilter> filters) {
104          this.filters = filters;
105        }
106    
107        public boolean accept(Path path) {
108          for (PathFilter filter : filters) {
109            if (!filter.accept(path)) {
110              return false;
111            }
112          }
113          return true;
114        }
115      }
116    
117      /**
118       * Is the given filename splitable? Usually, true, but if the file is
119       * stream compressed, it will not be.
120       * 
121       * <code>FileInputFormat</code> implementations can override this and return
122       * <code>false</code> to ensure that individual input files are never split-up
123       * so that {@link Mapper}s process entire files.
124       * 
125       * @param fs the file system that the file is on
126       * @param filename the file name to check
127       * @return is this file splitable?
128       */
129      protected boolean isSplitable(FileSystem fs, Path filename) {
130        return true;
131      }
132      
133      public abstract RecordReader<K, V> getRecordReader(InputSplit split,
134                                                   JobConf job,
135                                                   Reporter reporter)
136        throws IOException;
137    
138      /**
139       * Set a PathFilter to be applied to the input paths for the map-reduce job.
140       *
141       * @param filter the PathFilter class use for filtering the input paths.
142       */
143      public static void setInputPathFilter(JobConf conf,
144                                            Class<? extends PathFilter> filter) {
145        conf.setClass(org.apache.hadoop.mapreduce.lib.input.
146          FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
147      }
148    
149      /**
150       * Get a PathFilter instance of the filter set for the input paths.
151       *
152       * @return the PathFilter instance set for the job, NULL if none has been set.
153       */
154      public static PathFilter getInputPathFilter(JobConf conf) {
155        Class<? extends PathFilter> filterClass = conf.getClass(
156              org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
157              null, PathFilter.class);
158        return (filterClass != null) ?
159            ReflectionUtils.newInstance(filterClass, conf) : null;
160      }
161    
162      /**
163       * Add files in the input path recursively into the results.
164       * @param result
165       *          The List to store all files.
166       * @param fs
167       *          The FileSystem.
168       * @param path
169       *          The input path.
170       * @param inputFilter
171       *          The input filter that can be used to filter files/dirs. 
172       * @throws IOException
173       */
174      protected void addInputPathRecursively(List<FileStatus> result,
175          FileSystem fs, Path path, PathFilter inputFilter) 
176          throws IOException {
177        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
178        while (iter.hasNext()) {
179          LocatedFileStatus stat = iter.next();
180          if (inputFilter.accept(stat.getPath())) {
181            if (stat.isDirectory()) {
182              addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
183            } else {
184              result.add(stat);
185            }
186          }
187        }
188      }
189      
190      /** List input directories.
191       * Subclasses may override to, e.g., select only files matching a regular
192       * expression. 
193       * 
194       * @param job the job to list input paths for
195       * @return array of FileStatus objects
196       * @throws IOException if zero items.
197       */
198      protected FileStatus[] listStatus(JobConf job) throws IOException {
199        Path[] dirs = getInputPaths(job);
200        if (dirs.length == 0) {
201          throw new IOException("No input paths specified in job");
202        }
203    
204        // get tokens for all the required FileSystems..
205        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
206        
207        // Whether we need to recursive look into the directory structure
208        boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
209    
210        // creates a MultiPathFilter with the hiddenFileFilter and the
211        // user provided one (if any).
212        List<PathFilter> filters = new ArrayList<PathFilter>();
213        filters.add(hiddenFileFilter);
214        PathFilter jobFilter = getInputPathFilter(job);
215        if (jobFilter != null) {
216          filters.add(jobFilter);
217        }
218        PathFilter inputFilter = new MultiPathFilter(filters);
219    
220        FileStatus[] result;
221        int numThreads = job
222            .getInt(
223                org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
224                org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
225        
226        Stopwatch sw = new Stopwatch().start();
227        if (numThreads == 1) {
228          List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
229          result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
230        } else {
231          Iterable<FileStatus> locatedFiles = null;
232          try {
233            
234            LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
235                job, dirs, recursive, inputFilter, false);
236            locatedFiles = locatedFileStatusFetcher.getFileStatuses();
237          } catch (InterruptedException e) {
238            throw new IOException("Interrupted while getting file statuses");
239          }
240          result = Iterables.toArray(locatedFiles, FileStatus.class);
241        }
242    
243        sw.stop();
244        if (LOG.isDebugEnabled()) {
245          LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
246        }
247        LOG.info("Total input paths to process : " + result.length);
248        return result;
249      }
250      
251      private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
252          PathFilter inputFilter, boolean recursive) throws IOException {
253        List<FileStatus> result = new ArrayList<FileStatus>();
254        List<IOException> errors = new ArrayList<IOException>();
255        for (Path p: dirs) {
256          FileSystem fs = p.getFileSystem(job); 
257          FileStatus[] matches = fs.globStatus(p, inputFilter);
258          if (matches == null) {
259            errors.add(new IOException("Input path does not exist: " + p));
260          } else if (matches.length == 0) {
261            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
262          } else {
263            for (FileStatus globStat: matches) {
264              if (globStat.isDirectory()) {
265                RemoteIterator<LocatedFileStatus> iter =
266                    fs.listLocatedStatus(globStat.getPath());
267                while (iter.hasNext()) {
268                  LocatedFileStatus stat = iter.next();
269                  if (inputFilter.accept(stat.getPath())) {
270                    if (recursive && stat.isDirectory()) {
271                      addInputPathRecursively(result, fs, stat.getPath(),
272                          inputFilter);
273                    } else {
274                      result.add(stat);
275                    }
276                  }
277                }
278              } else {
279                result.add(globStat);
280              }
281            }
282          }
283        }
284        if (!errors.isEmpty()) {
285          throw new InvalidInputException(errors);
286        }
287        return result;
288      }
289    
290      /**
291       * A factory that makes the split for this class. It can be overridden
292       * by sub-classes to make sub-types
293       */
294      protected FileSplit makeSplit(Path file, long start, long length, 
295                                    String[] hosts) {
296        return new FileSplit(file, start, length, hosts);
297      }
298      
299      /**
300       * A factory that makes the split for this class. It can be overridden
301       * by sub-classes to make sub-types
302       */
303      protected FileSplit makeSplit(Path file, long start, long length, 
304                                    String[] hosts, String[] inMemoryHosts) {
305        return new FileSplit(file, start, length, hosts, inMemoryHosts);
306      }
307    
308      /** Splits files returned by {@link #listStatus(JobConf)} when
309       * they're too big.*/ 
310      public InputSplit[] getSplits(JobConf job, int numSplits)
311        throws IOException {
312        Stopwatch sw = new Stopwatch().start();
313        FileStatus[] files = listStatus(job);
314        
315        // Save the number of input files for metrics/loadgen
316        job.setLong(NUM_INPUT_FILES, files.length);
317        long totalSize = 0;                           // compute total size
318        for (FileStatus file: files) {                // check we have valid files
319          if (file.isDirectory()) {
320            throw new IOException("Not a file: "+ file.getPath());
321          }
322          totalSize += file.getLen();
323        }
324    
325        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
326        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
327          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
328    
329        // generate splits
330        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
331        NetworkTopology clusterMap = new NetworkTopology();
332        for (FileStatus file: files) {
333          Path path = file.getPath();
334          long length = file.getLen();
335          if (length != 0) {
336            FileSystem fs = path.getFileSystem(job);
337            BlockLocation[] blkLocations;
338            if (file instanceof LocatedFileStatus) {
339              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
340            } else {
341              blkLocations = fs.getFileBlockLocations(file, 0, length);
342            }
343            if (isSplitable(fs, path)) {
344              long blockSize = file.getBlockSize();
345              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
346    
347              long bytesRemaining = length;
348              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
349                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
350                    length-bytesRemaining, splitSize, clusterMap);
351                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
352                    splitHosts[0], splitHosts[1]));
353                bytesRemaining -= splitSize;
354              }
355    
356              if (bytesRemaining != 0) {
357                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
358                    - bytesRemaining, bytesRemaining, clusterMap);
359                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
360                    splitHosts[0], splitHosts[1]));
361              }
362            } else {
363              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
364              splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
365            }
366          } else { 
367            //Create empty hosts array for zero length files
368            splits.add(makeSplit(path, 0, length, new String[0]));
369          }
370        }
371        sw.stop();
372        if (LOG.isDebugEnabled()) {
373          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
374              + ", TimeTaken: " + sw.elapsedMillis());
375        }
376        return splits.toArray(new FileSplit[splits.size()]);
377      }
378    
379      protected long computeSplitSize(long goalSize, long minSize,
380                                           long blockSize) {
381        return Math.max(minSize, Math.min(goalSize, blockSize));
382      }
383    
384      protected int getBlockIndex(BlockLocation[] blkLocations, 
385                                  long offset) {
386        for (int i = 0 ; i < blkLocations.length; i++) {
387          // is the offset inside this block?
388          if ((blkLocations[i].getOffset() <= offset) &&
389              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
390            return i;
391          }
392        }
393        BlockLocation last = blkLocations[blkLocations.length -1];
394        long fileLength = last.getOffset() + last.getLength() -1;
395        throw new IllegalArgumentException("Offset " + offset + 
396                                           " is outside of file (0.." +
397                                           fileLength + ")");
398      }
399    
400      /**
401       * Sets the given comma separated paths as the list of inputs 
402       * for the map-reduce job.
403       * 
404       * @param conf Configuration of the job
405       * @param commaSeparatedPaths Comma separated paths to be set as 
406       *        the list of inputs for the map-reduce job.
407       */
408      public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
409        setInputPaths(conf, StringUtils.stringToPath(
410                            getPathStrings(commaSeparatedPaths)));
411      }
412    
413      /**
414       * Add the given comma separated paths to the list of inputs for
415       *  the map-reduce job.
416       * 
417       * @param conf The configuration of the job 
418       * @param commaSeparatedPaths Comma separated paths to be added to
419       *        the list of inputs for the map-reduce job.
420       */
421      public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
422        for (String str : getPathStrings(commaSeparatedPaths)) {
423          addInputPath(conf, new Path(str));
424        }
425      }
426    
427      /**
428       * Set the array of {@link Path}s as the list of inputs
429       * for the map-reduce job.
430       * 
431       * @param conf Configuration of the job. 
432       * @param inputPaths the {@link Path}s of the input directories/files 
433       * for the map-reduce job.
434       */ 
435      public static void setInputPaths(JobConf conf, Path... inputPaths) {
436        Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
437        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
438        for(int i = 1; i < inputPaths.length;i++) {
439          str.append(StringUtils.COMMA_STR);
440          path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
441          str.append(StringUtils.escapeString(path.toString()));
442        }
443        conf.set(org.apache.hadoop.mapreduce.lib.input.
444          FileInputFormat.INPUT_DIR, str.toString());
445      }
446    
447      /**
448       * Add a {@link Path} to the list of inputs for the map-reduce job.
449       * 
450       * @param conf The configuration of the job 
451       * @param path {@link Path} to be added to the list of inputs for 
452       *            the map-reduce job.
453       */
454      public static void addInputPath(JobConf conf, Path path ) {
455        path = new Path(conf.getWorkingDirectory(), path);
456        String dirStr = StringUtils.escapeString(path.toString());
457        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
458          FileInputFormat.INPUT_DIR);
459        conf.set(org.apache.hadoop.mapreduce.lib.input.
460          FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
461          dirs + StringUtils.COMMA_STR + dirStr);
462      }
463             
464      // This method escapes commas in the glob pattern of the given paths.
465      private static String[] getPathStrings(String commaSeparatedPaths) {
466        int length = commaSeparatedPaths.length();
467        int curlyOpen = 0;
468        int pathStart = 0;
469        boolean globPattern = false;
470        List<String> pathStrings = new ArrayList<String>();
471        
472        for (int i=0; i<length; i++) {
473          char ch = commaSeparatedPaths.charAt(i);
474          switch(ch) {
475            case '{' : {
476              curlyOpen++;
477              if (!globPattern) {
478                globPattern = true;
479              }
480              break;
481            }
482            case '}' : {
483              curlyOpen--;
484              if (curlyOpen == 0 && globPattern) {
485                globPattern = false;
486              }
487              break;
488            }
489            case ',' : {
490              if (!globPattern) {
491                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
492                pathStart = i + 1 ;
493              }
494              break;
495            }
496            default:
497              continue; // nothing special to do for this character
498          }
499        }
500        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
501        
502        return pathStrings.toArray(new String[0]);
503      }
504      
505      /**
506       * Get the list of input {@link Path}s for the map-reduce job.
507       * 
508       * @param conf The configuration of the job 
509       * @return the list of input {@link Path}s for the map-reduce job.
510       */
511      public static Path[] getInputPaths(JobConf conf) {
512        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
513          FileInputFormat.INPUT_DIR, "");
514        String [] list = StringUtils.split(dirs);
515        Path[] result = new Path[list.length];
516        for (int i = 0; i < list.length; i++) {
517          result[i] = new Path(StringUtils.unEscapeString(list[i]));
518        }
519        return result;
520      }
521      
522    
523      private void sortInDescendingOrder(List<NodeInfo> mylist) {
524        Collections.sort(mylist, new Comparator<NodeInfo> () {
525          public int compare(NodeInfo obj1, NodeInfo obj2) {
526    
527            if (obj1 == null || obj2 == null)
528              return -1;
529    
530            if (obj1.getValue() == obj2.getValue()) {
531              return 0;
532            }
533            else {
534              return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
535            }
536          }
537        }
538        );
539      }
540    
541      /** 
542       * This function identifies and returns the hosts that contribute 
543       * most for a given split. For calculating the contribution, rack
544       * locality is treated on par with host locality, so hosts from racks
545       * that contribute the most are preferred over hosts on racks that 
546       * contribute less
547       * @param blkLocations The list of block locations
548       * @param offset 
549       * @param splitSize 
550       * @return an array of hosts that contribute most to this split
551       * @throws IOException
552       */
553      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
554          long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
555        return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
556            clusterMap)[0];
557      }
558      
559      /** 
560       * This function identifies and returns the hosts that contribute 
561       * most for a given split. For calculating the contribution, rack
562       * locality is treated on par with host locality, so hosts from racks
563       * that contribute the most are preferred over hosts on racks that 
564       * contribute less
565       * @param blkLocations The list of block locations
566       * @param offset 
567       * @param splitSize 
568       * @return two arrays - one of hosts that contribute most to this split, and
569       *    one of hosts that contribute most to this split that have the data
570       *    cached on them
571       * @throws IOException
572       */
573      private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
574          long offset, long splitSize, NetworkTopology clusterMap)
575      throws IOException {
576    
577        int startIndex = getBlockIndex(blkLocations, offset);
578    
579        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
580                              blkLocations[startIndex].getLength() - offset;
581    
582        //If this is the only block, just return
583        if (bytesInThisBlock >= splitSize) {
584          return new String[][] { blkLocations[startIndex].getHosts(),
585              blkLocations[startIndex].getCachedHosts() };
586        }
587    
588        long bytesInFirstBlock = bytesInThisBlock;
589        int index = startIndex + 1;
590        splitSize -= bytesInThisBlock;
591    
592        while (splitSize > 0) {
593          bytesInThisBlock =
594            Math.min(splitSize, blkLocations[index++].getLength());
595          splitSize -= bytesInThisBlock;
596        }
597    
598        long bytesInLastBlock = bytesInThisBlock;
599        int endIndex = index - 1;
600        
601        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
602        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
603        String [] allTopos = new String[0];
604    
605        // Build the hierarchy and aggregate the contribution of 
606        // bytes at each level. See TestGetSplitHosts.java 
607    
608        for (index = startIndex; index <= endIndex; index++) {
609    
610          // Establish the bytes in this block
611          if (index == startIndex) {
612            bytesInThisBlock = bytesInFirstBlock;
613          }
614          else if (index == endIndex) {
615            bytesInThisBlock = bytesInLastBlock;
616          }
617          else {
618            bytesInThisBlock = blkLocations[index].getLength();
619          }
620          
621          allTopos = blkLocations[index].getTopologyPaths();
622    
623          // If no topology information is available, just
624          // prefix a fakeRack
625          if (allTopos.length == 0) {
626            allTopos = fakeRacks(blkLocations, index);
627          }
628    
629          // NOTE: This code currently works only for one level of
630          // hierarchy (rack/host). However, it is relatively easy
631          // to extend this to support aggregation at different
632          // levels 
633          
634          for (String topo: allTopos) {
635    
636            Node node, parentNode;
637            NodeInfo nodeInfo, parentNodeInfo;
638    
639            node = clusterMap.getNode(topo);
640    
641            if (node == null) {
642              node = new NodeBase(topo);
643              clusterMap.add(node);
644            }
645            
646            nodeInfo = hostsMap.get(node);
647            
648            if (nodeInfo == null) {
649              nodeInfo = new NodeInfo(node);
650              hostsMap.put(node,nodeInfo);
651              parentNode = node.getParent();
652              parentNodeInfo = racksMap.get(parentNode);
653              if (parentNodeInfo == null) {
654                parentNodeInfo = new NodeInfo(parentNode);
655                racksMap.put(parentNode,parentNodeInfo);
656              }
657              parentNodeInfo.addLeaf(nodeInfo);
658            }
659            else {
660              nodeInfo = hostsMap.get(node);
661              parentNode = node.getParent();
662              parentNodeInfo = racksMap.get(parentNode);
663            }
664    
665            nodeInfo.addValue(index, bytesInThisBlock);
666            parentNodeInfo.addValue(index, bytesInThisBlock);
667    
668          } // for all topos
669        
670        } // for all indices
671    
672        // We don't yet support cached hosts when bytesInThisBlock > splitSize
673        return new String[][] { identifyHosts(allTopos.length, racksMap),
674            new String[0]};
675      }
676      
677      private String[] identifyHosts(int replicationFactor, 
678                                     Map<Node,NodeInfo> racksMap) {
679        
680        String [] retVal = new String[replicationFactor];
681       
682        List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
683    
684        rackList.addAll(racksMap.values());
685        
686        // Sort the racks based on their contribution to this split
687        sortInDescendingOrder(rackList);
688        
689        boolean done = false;
690        int index = 0;
691        
692        // Get the host list for all our aggregated items, sort
693        // them and return the top entries
694        for (NodeInfo ni: rackList) {
695    
696          Set<NodeInfo> hostSet = ni.getLeaves();
697    
698          List<NodeInfo>hostList = new LinkedList<NodeInfo>();
699          hostList.addAll(hostSet);
700        
701          // Sort the hosts in this rack based on their contribution
702          sortInDescendingOrder(hostList);
703    
704          for (NodeInfo host: hostList) {
705            // Strip out the port number from the host name
706            retVal[index++] = host.node.getName().split(":")[0];
707            if (index == replicationFactor) {
708              done = true;
709              break;
710            }
711          }
712          
713          if (done == true) {
714            break;
715          }
716        }
717        return retVal;
718      }
719      
720      private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
721      throws IOException {
722        String[] allHosts = blkLocations[index].getHosts();
723        String[] allTopos = new String[allHosts.length];
724        for (int i = 0; i < allHosts.length; i++) {
725          allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
726        }
727        return allTopos;
728      }
729    
730    
731      private static class NodeInfo {
732        final Node node;
733        final Set<Integer> blockIds;
734        final Set<NodeInfo> leaves;
735    
736        private long value;
737        
738        NodeInfo(Node node) {
739          this.node = node;
740          blockIds = new HashSet<Integer>();
741          leaves = new HashSet<NodeInfo>();
742        }
743    
744        long getValue() {return value;}
745    
746        void addValue(int blockIndex, long value) {
747          if (blockIds.add(blockIndex) == true) {
748            this.value += value;
749          }
750        }
751    
752        Set<NodeInfo> getLeaves() { return leaves;}
753    
754        void addLeaf(NodeInfo nodeInfo) {
755          leaves.add(nodeInfo);
756        }
757      }
758    }