001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.HashSet;
026import java.util.IdentityHashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.fs.BlockLocation;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.LocatedFileStatus;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.PathFilter;
042import org.apache.hadoop.fs.RemoteIterator;
043import org.apache.hadoop.mapreduce.security.TokenCache;
044import org.apache.hadoop.net.NetworkTopology;
045import org.apache.hadoop.net.Node;
046import org.apache.hadoop.net.NodeBase;
047import org.apache.hadoop.util.ReflectionUtils;
048import org.apache.hadoop.util.StringUtils;
049
050import com.google.common.base.Stopwatch;
051import com.google.common.collect.Iterables;
052
053/** 
054 * A base class for file-based {@link InputFormat}.
055 * 
056 * <p><code>FileInputFormat</code> is the base class for all file-based 
057 * <code>InputFormat</code>s. This provides a generic implementation of
058 * {@link #getSplits(JobConf, int)}.
059 * Subclasses of <code>FileInputFormat</code> can also override the 
060 * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
061 * not split-up and are processed as a whole by {@link Mapper}s.
062 */
063@InterfaceAudience.Public
064@InterfaceStability.Stable
065public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
066
067  public static final Log LOG =
068    LogFactory.getLog(FileInputFormat.class);
069  
070  @Deprecated
071  public static enum Counter { 
072    BYTES_READ
073  }
074
075  public static final String NUM_INPUT_FILES =
076    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
077  
078  public static final String INPUT_DIR_RECURSIVE = 
079    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
080
081
082  private static final double SPLIT_SLOP = 1.1;   // 10% slop
083
084  private long minSplitSize = 1;
085  private static final PathFilter hiddenFileFilter = new PathFilter(){
086      public boolean accept(Path p){
087        String name = p.getName(); 
088        return !name.startsWith("_") && !name.startsWith("."); 
089      }
090    }; 
091  protected void setMinSplitSize(long minSplitSize) {
092    this.minSplitSize = minSplitSize;
093  }
094
095  /**
096   * Proxy PathFilter that accepts a path only if all filters given in the
097   * constructor do. Used by the listPaths() to apply the built-in
098   * hiddenFileFilter together with a user provided one (if any).
099   */
100  private static class MultiPathFilter implements PathFilter {
101    private List<PathFilter> filters;
102
103    public MultiPathFilter(List<PathFilter> filters) {
104      this.filters = filters;
105    }
106
107    public boolean accept(Path path) {
108      for (PathFilter filter : filters) {
109        if (!filter.accept(path)) {
110          return false;
111        }
112      }
113      return true;
114    }
115  }
116
117  /**
118   * Is the given filename splitable? Usually, true, but if the file is
119   * stream compressed, it will not be.
120   * 
121   * <code>FileInputFormat</code> implementations can override this and return
122   * <code>false</code> to ensure that individual input files are never split-up
123   * so that {@link Mapper}s process entire files.
124   * 
125   * @param fs the file system that the file is on
126   * @param filename the file name to check
127   * @return is this file splitable?
128   */
129  protected boolean isSplitable(FileSystem fs, Path filename) {
130    return true;
131  }
132  
133  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
134                                               JobConf job,
135                                               Reporter reporter)
136    throws IOException;
137
138  /**
139   * Set a PathFilter to be applied to the input paths for the map-reduce job.
140   *
141   * @param filter the PathFilter class use for filtering the input paths.
142   */
143  public static void setInputPathFilter(JobConf conf,
144                                        Class<? extends PathFilter> filter) {
145    conf.setClass(org.apache.hadoop.mapreduce.lib.input.
146      FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
147  }
148
149  /**
150   * Get a PathFilter instance of the filter set for the input paths.
151   *
152   * @return the PathFilter instance set for the job, NULL if none has been set.
153   */
154  public static PathFilter getInputPathFilter(JobConf conf) {
155    Class<? extends PathFilter> filterClass = conf.getClass(
156          org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
157          null, PathFilter.class);
158    return (filterClass != null) ?
159        ReflectionUtils.newInstance(filterClass, conf) : null;
160  }
161
162  /**
163   * Add files in the input path recursively into the results.
164   * @param result
165   *          The List to store all files.
166   * @param fs
167   *          The FileSystem.
168   * @param path
169   *          The input path.
170   * @param inputFilter
171   *          The input filter that can be used to filter files/dirs. 
172   * @throws IOException
173   */
174  protected void addInputPathRecursively(List<FileStatus> result,
175      FileSystem fs, Path path, PathFilter inputFilter) 
176      throws IOException {
177    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
178    while (iter.hasNext()) {
179      LocatedFileStatus stat = iter.next();
180      if (inputFilter.accept(stat.getPath())) {
181        if (stat.isDirectory()) {
182          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
183        } else {
184          result.add(stat);
185        }
186      }
187    }
188  }
189  
190  /** List input directories.
191   * Subclasses may override to, e.g., select only files matching a regular
192   * expression. 
193   * 
194   * @param job the job to list input paths for
195   * @return array of FileStatus objects
196   * @throws IOException if zero items.
197   */
198  protected FileStatus[] listStatus(JobConf job) throws IOException {
199    Path[] dirs = getInputPaths(job);
200    if (dirs.length == 0) {
201      throw new IOException("No input paths specified in job");
202    }
203
204    // get tokens for all the required FileSystems..
205    TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
206    
207    // Whether we need to recursive look into the directory structure
208    boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
209
210    // creates a MultiPathFilter with the hiddenFileFilter and the
211    // user provided one (if any).
212    List<PathFilter> filters = new ArrayList<PathFilter>();
213    filters.add(hiddenFileFilter);
214    PathFilter jobFilter = getInputPathFilter(job);
215    if (jobFilter != null) {
216      filters.add(jobFilter);
217    }
218    PathFilter inputFilter = new MultiPathFilter(filters);
219
220    FileStatus[] result;
221    int numThreads = job
222        .getInt(
223            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
224            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
225    
226    Stopwatch sw = new Stopwatch().start();
227    if (numThreads == 1) {
228      List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
229      result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
230    } else {
231      Iterable<FileStatus> locatedFiles = null;
232      try {
233        
234        LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
235            job, dirs, recursive, inputFilter, false);
236        locatedFiles = locatedFileStatusFetcher.getFileStatuses();
237      } catch (InterruptedException e) {
238        throw new IOException("Interrupted while getting file statuses");
239      }
240      result = Iterables.toArray(locatedFiles, FileStatus.class);
241    }
242
243    sw.stop();
244    if (LOG.isDebugEnabled()) {
245      LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
246    }
247    LOG.info("Total input paths to process : " + result.length);
248    return result;
249  }
250  
251  private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
252      PathFilter inputFilter, boolean recursive) throws IOException {
253    List<FileStatus> result = new ArrayList<FileStatus>();
254    List<IOException> errors = new ArrayList<IOException>();
255    for (Path p: dirs) {
256      FileSystem fs = p.getFileSystem(job); 
257      FileStatus[] matches = fs.globStatus(p, inputFilter);
258      if (matches == null) {
259        errors.add(new IOException("Input path does not exist: " + p));
260      } else if (matches.length == 0) {
261        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
262      } else {
263        for (FileStatus globStat: matches) {
264          if (globStat.isDirectory()) {
265            RemoteIterator<LocatedFileStatus> iter =
266                fs.listLocatedStatus(globStat.getPath());
267            while (iter.hasNext()) {
268              LocatedFileStatus stat = iter.next();
269              if (inputFilter.accept(stat.getPath())) {
270                if (recursive && stat.isDirectory()) {
271                  addInputPathRecursively(result, fs, stat.getPath(),
272                      inputFilter);
273                } else {
274                  result.add(stat);
275                }
276              }
277            }
278          } else {
279            result.add(globStat);
280          }
281        }
282      }
283    }
284    if (!errors.isEmpty()) {
285      throw new InvalidInputException(errors);
286    }
287    return result;
288  }
289
290  /**
291   * A factory that makes the split for this class. It can be overridden
292   * by sub-classes to make sub-types
293   */
294  protected FileSplit makeSplit(Path file, long start, long length, 
295                                String[] hosts) {
296    return new FileSplit(file, start, length, hosts);
297  }
298  
299  /**
300   * A factory that makes the split for this class. It can be overridden
301   * by sub-classes to make sub-types
302   */
303  protected FileSplit makeSplit(Path file, long start, long length, 
304                                String[] hosts, String[] inMemoryHosts) {
305    return new FileSplit(file, start, length, hosts, inMemoryHosts);
306  }
307
308  /** Splits files returned by {@link #listStatus(JobConf)} when
309   * they're too big.*/ 
310  public InputSplit[] getSplits(JobConf job, int numSplits)
311    throws IOException {
312    Stopwatch sw = new Stopwatch().start();
313    FileStatus[] files = listStatus(job);
314    
315    // Save the number of input files for metrics/loadgen
316    job.setLong(NUM_INPUT_FILES, files.length);
317    long totalSize = 0;                           // compute total size
318    for (FileStatus file: files) {                // check we have valid files
319      if (file.isDirectory()) {
320        throw new IOException("Not a file: "+ file.getPath());
321      }
322      totalSize += file.getLen();
323    }
324
325    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
326    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
327      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
328
329    // generate splits
330    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
331    NetworkTopology clusterMap = new NetworkTopology();
332    for (FileStatus file: files) {
333      Path path = file.getPath();
334      long length = file.getLen();
335      if (length != 0) {
336        FileSystem fs = path.getFileSystem(job);
337        BlockLocation[] blkLocations;
338        if (file instanceof LocatedFileStatus) {
339          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
340        } else {
341          blkLocations = fs.getFileBlockLocations(file, 0, length);
342        }
343        if (isSplitable(fs, path)) {
344          long blockSize = file.getBlockSize();
345          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
346
347          long bytesRemaining = length;
348          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
349            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
350                length-bytesRemaining, splitSize, clusterMap);
351            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
352                splitHosts[0], splitHosts[1]));
353            bytesRemaining -= splitSize;
354          }
355
356          if (bytesRemaining != 0) {
357            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
358                - bytesRemaining, bytesRemaining, clusterMap);
359            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
360                splitHosts[0], splitHosts[1]));
361          }
362        } else {
363          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
364          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
365        }
366      } else { 
367        //Create empty hosts array for zero length files
368        splits.add(makeSplit(path, 0, length, new String[0]));
369      }
370    }
371    sw.stop();
372    if (LOG.isDebugEnabled()) {
373      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
374          + ", TimeTaken: " + sw.elapsedMillis());
375    }
376    return splits.toArray(new FileSplit[splits.size()]);
377  }
378
379  protected long computeSplitSize(long goalSize, long minSize,
380                                       long blockSize) {
381    return Math.max(minSize, Math.min(goalSize, blockSize));
382  }
383
384  protected int getBlockIndex(BlockLocation[] blkLocations, 
385                              long offset) {
386    for (int i = 0 ; i < blkLocations.length; i++) {
387      // is the offset inside this block?
388      if ((blkLocations[i].getOffset() <= offset) &&
389          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
390        return i;
391      }
392    }
393    BlockLocation last = blkLocations[blkLocations.length -1];
394    long fileLength = last.getOffset() + last.getLength() -1;
395    throw new IllegalArgumentException("Offset " + offset + 
396                                       " is outside of file (0.." +
397                                       fileLength + ")");
398  }
399
400  /**
401   * Sets the given comma separated paths as the list of inputs 
402   * for the map-reduce job.
403   * 
404   * @param conf Configuration of the job
405   * @param commaSeparatedPaths Comma separated paths to be set as 
406   *        the list of inputs for the map-reduce job.
407   */
408  public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
409    setInputPaths(conf, StringUtils.stringToPath(
410                        getPathStrings(commaSeparatedPaths)));
411  }
412
413  /**
414   * Add the given comma separated paths to the list of inputs for
415   *  the map-reduce job.
416   * 
417   * @param conf The configuration of the job 
418   * @param commaSeparatedPaths Comma separated paths to be added to
419   *        the list of inputs for the map-reduce job.
420   */
421  public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
422    for (String str : getPathStrings(commaSeparatedPaths)) {
423      addInputPath(conf, new Path(str));
424    }
425  }
426
427  /**
428   * Set the array of {@link Path}s as the list of inputs
429   * for the map-reduce job.
430   * 
431   * @param conf Configuration of the job. 
432   * @param inputPaths the {@link Path}s of the input directories/files 
433   * for the map-reduce job.
434   */ 
435  public static void setInputPaths(JobConf conf, Path... inputPaths) {
436    Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
437    StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
438    for(int i = 1; i < inputPaths.length;i++) {
439      str.append(StringUtils.COMMA_STR);
440      path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
441      str.append(StringUtils.escapeString(path.toString()));
442    }
443    conf.set(org.apache.hadoop.mapreduce.lib.input.
444      FileInputFormat.INPUT_DIR, str.toString());
445  }
446
447  /**
448   * Add a {@link Path} to the list of inputs for the map-reduce job.
449   * 
450   * @param conf The configuration of the job 
451   * @param path {@link Path} to be added to the list of inputs for 
452   *            the map-reduce job.
453   */
454  public static void addInputPath(JobConf conf, Path path ) {
455    path = new Path(conf.getWorkingDirectory(), path);
456    String dirStr = StringUtils.escapeString(path.toString());
457    String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
458      FileInputFormat.INPUT_DIR);
459    conf.set(org.apache.hadoop.mapreduce.lib.input.
460      FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
461      dirs + StringUtils.COMMA_STR + dirStr);
462  }
463         
464  // This method escapes commas in the glob pattern of the given paths.
465  private static String[] getPathStrings(String commaSeparatedPaths) {
466    int length = commaSeparatedPaths.length();
467    int curlyOpen = 0;
468    int pathStart = 0;
469    boolean globPattern = false;
470    List<String> pathStrings = new ArrayList<String>();
471    
472    for (int i=0; i<length; i++) {
473      char ch = commaSeparatedPaths.charAt(i);
474      switch(ch) {
475        case '{' : {
476          curlyOpen++;
477          if (!globPattern) {
478            globPattern = true;
479          }
480          break;
481        }
482        case '}' : {
483          curlyOpen--;
484          if (curlyOpen == 0 && globPattern) {
485            globPattern = false;
486          }
487          break;
488        }
489        case ',' : {
490          if (!globPattern) {
491            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
492            pathStart = i + 1 ;
493          }
494          break;
495        }
496        default:
497          continue; // nothing special to do for this character
498      }
499    }
500    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
501    
502    return pathStrings.toArray(new String[0]);
503  }
504  
505  /**
506   * Get the list of input {@link Path}s for the map-reduce job.
507   * 
508   * @param conf The configuration of the job 
509   * @return the list of input {@link Path}s for the map-reduce job.
510   */
511  public static Path[] getInputPaths(JobConf conf) {
512    String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
513      FileInputFormat.INPUT_DIR, "");
514    String [] list = StringUtils.split(dirs);
515    Path[] result = new Path[list.length];
516    for (int i = 0; i < list.length; i++) {
517      result[i] = new Path(StringUtils.unEscapeString(list[i]));
518    }
519    return result;
520  }
521  
522
523  private void sortInDescendingOrder(List<NodeInfo> mylist) {
524    Collections.sort(mylist, new Comparator<NodeInfo> () {
525      public int compare(NodeInfo obj1, NodeInfo obj2) {
526
527        if (obj1 == null || obj2 == null)
528          return -1;
529
530        if (obj1.getValue() == obj2.getValue()) {
531          return 0;
532        }
533        else {
534          return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
535        }
536      }
537    }
538    );
539  }
540
541  /** 
542   * This function identifies and returns the hosts that contribute 
543   * most for a given split. For calculating the contribution, rack
544   * locality is treated on par with host locality, so hosts from racks
545   * that contribute the most are preferred over hosts on racks that 
546   * contribute less
547   * @param blkLocations The list of block locations
548   * @param offset 
549   * @param splitSize 
550   * @return an array of hosts that contribute most to this split
551   * @throws IOException
552   */
553  protected String[] getSplitHosts(BlockLocation[] blkLocations, 
554      long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
555    return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
556        clusterMap)[0];
557  }
558  
559  /** 
560   * This function identifies and returns the hosts that contribute 
561   * most for a given split. For calculating the contribution, rack
562   * locality is treated on par with host locality, so hosts from racks
563   * that contribute the most are preferred over hosts on racks that 
564   * contribute less
565   * @param blkLocations The list of block locations
566   * @param offset 
567   * @param splitSize 
568   * @return two arrays - one of hosts that contribute most to this split, and
569   *    one of hosts that contribute most to this split that have the data
570   *    cached on them
571   * @throws IOException
572   */
573  private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
574      long offset, long splitSize, NetworkTopology clusterMap)
575  throws IOException {
576
577    int startIndex = getBlockIndex(blkLocations, offset);
578
579    long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
580                          blkLocations[startIndex].getLength() - offset;
581
582    //If this is the only block, just return
583    if (bytesInThisBlock >= splitSize) {
584      return new String[][] { blkLocations[startIndex].getHosts(),
585          blkLocations[startIndex].getCachedHosts() };
586    }
587
588    long bytesInFirstBlock = bytesInThisBlock;
589    int index = startIndex + 1;
590    splitSize -= bytesInThisBlock;
591
592    while (splitSize > 0) {
593      bytesInThisBlock =
594        Math.min(splitSize, blkLocations[index++].getLength());
595      splitSize -= bytesInThisBlock;
596    }
597
598    long bytesInLastBlock = bytesInThisBlock;
599    int endIndex = index - 1;
600    
601    Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
602    Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
603    String [] allTopos = new String[0];
604
605    // Build the hierarchy and aggregate the contribution of 
606    // bytes at each level. See TestGetSplitHosts.java 
607
608    for (index = startIndex; index <= endIndex; index++) {
609
610      // Establish the bytes in this block
611      if (index == startIndex) {
612        bytesInThisBlock = bytesInFirstBlock;
613      }
614      else if (index == endIndex) {
615        bytesInThisBlock = bytesInLastBlock;
616      }
617      else {
618        bytesInThisBlock = blkLocations[index].getLength();
619      }
620      
621      allTopos = blkLocations[index].getTopologyPaths();
622
623      // If no topology information is available, just
624      // prefix a fakeRack
625      if (allTopos.length == 0) {
626        allTopos = fakeRacks(blkLocations, index);
627      }
628
629      // NOTE: This code currently works only for one level of
630      // hierarchy (rack/host). However, it is relatively easy
631      // to extend this to support aggregation at different
632      // levels 
633      
634      for (String topo: allTopos) {
635
636        Node node, parentNode;
637        NodeInfo nodeInfo, parentNodeInfo;
638
639        node = clusterMap.getNode(topo);
640
641        if (node == null) {
642          node = new NodeBase(topo);
643          clusterMap.add(node);
644        }
645        
646        nodeInfo = hostsMap.get(node);
647        
648        if (nodeInfo == null) {
649          nodeInfo = new NodeInfo(node);
650          hostsMap.put(node,nodeInfo);
651          parentNode = node.getParent();
652          parentNodeInfo = racksMap.get(parentNode);
653          if (parentNodeInfo == null) {
654            parentNodeInfo = new NodeInfo(parentNode);
655            racksMap.put(parentNode,parentNodeInfo);
656          }
657          parentNodeInfo.addLeaf(nodeInfo);
658        }
659        else {
660          nodeInfo = hostsMap.get(node);
661          parentNode = node.getParent();
662          parentNodeInfo = racksMap.get(parentNode);
663        }
664
665        nodeInfo.addValue(index, bytesInThisBlock);
666        parentNodeInfo.addValue(index, bytesInThisBlock);
667
668      } // for all topos
669    
670    } // for all indices
671
672    // We don't yet support cached hosts when bytesInThisBlock > splitSize
673    return new String[][] { identifyHosts(allTopos.length, racksMap),
674        new String[0]};
675  }
676  
677  private String[] identifyHosts(int replicationFactor, 
678                                 Map<Node,NodeInfo> racksMap) {
679    
680    String [] retVal = new String[replicationFactor];
681   
682    List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
683
684    rackList.addAll(racksMap.values());
685    
686    // Sort the racks based on their contribution to this split
687    sortInDescendingOrder(rackList);
688    
689    boolean done = false;
690    int index = 0;
691    
692    // Get the host list for all our aggregated items, sort
693    // them and return the top entries
694    for (NodeInfo ni: rackList) {
695
696      Set<NodeInfo> hostSet = ni.getLeaves();
697
698      List<NodeInfo>hostList = new LinkedList<NodeInfo>();
699      hostList.addAll(hostSet);
700    
701      // Sort the hosts in this rack based on their contribution
702      sortInDescendingOrder(hostList);
703
704      for (NodeInfo host: hostList) {
705        // Strip out the port number from the host name
706        retVal[index++] = host.node.getName().split(":")[0];
707        if (index == replicationFactor) {
708          done = true;
709          break;
710        }
711      }
712      
713      if (done == true) {
714        break;
715      }
716    }
717    return retVal;
718  }
719  
720  private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
721  throws IOException {
722    String[] allHosts = blkLocations[index].getHosts();
723    String[] allTopos = new String[allHosts.length];
724    for (int i = 0; i < allHosts.length; i++) {
725      allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
726    }
727    return allTopos;
728  }
729
730
731  private static class NodeInfo {
732    final Node node;
733    final Set<Integer> blockIds;
734    final Set<NodeInfo> leaves;
735
736    private long value;
737    
738    NodeInfo(Node node) {
739      this.node = node;
740      blockIds = new HashSet<Integer>();
741      leaves = new HashSet<NodeInfo>();
742    }
743
744    long getValue() {return value;}
745
746    void addValue(int blockIndex, long value) {
747      if (blockIds.add(blockIndex) == true) {
748        this.value += value;
749      }
750    }
751
752    Set<NodeInfo> getLeaves() { return leaves;}
753
754    void addLeaf(NodeInfo nodeInfo) {
755      leaves.add(nodeInfo);
756    }
757  }
758}