001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.Comparator;
025    import java.util.HashSet;
026    import java.util.IdentityHashMap;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.fs.BlockLocation;
037    import org.apache.hadoop.fs.FileStatus;
038    import org.apache.hadoop.fs.FileSystem;
039    import org.apache.hadoop.fs.LocatedFileStatus;
040    import org.apache.hadoop.fs.Path;
041    import org.apache.hadoop.fs.PathFilter;
042    import org.apache.hadoop.fs.RemoteIterator;
043    import org.apache.hadoop.mapreduce.security.TokenCache;
044    import org.apache.hadoop.net.NetworkTopology;
045    import org.apache.hadoop.net.Node;
046    import org.apache.hadoop.net.NodeBase;
047    import org.apache.hadoop.util.ReflectionUtils;
048    import org.apache.hadoop.util.StringUtils;
049    
050    import com.google.common.base.Stopwatch;
051    import com.google.common.collect.Iterables;
052    
053    /** 
054     * A base class for file-based {@link InputFormat}.
055     * 
056     * <p><code>FileInputFormat</code> is the base class for all file-based 
057     * <code>InputFormat</code>s. This provides a generic implementation of
058     * {@link #getSplits(JobConf, int)}.
059     * Subclasses of <code>FileInputFormat</code> can also override the 
060     * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
061     * not split-up and are processed as a whole by {@link Mapper}s.
062     */
063    @InterfaceAudience.Public
064    @InterfaceStability.Stable
065    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
066    
067      public static final Log LOG =
068        LogFactory.getLog(FileInputFormat.class);
069      
070      @Deprecated
071      public static enum Counter { 
072        BYTES_READ
073      }
074    
075      public static final String NUM_INPUT_FILES =
076        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
077      
078      public static final String INPUT_DIR_RECURSIVE = 
079        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
080    
081    
082      private static final double SPLIT_SLOP = 1.1;   // 10% slop
083    
084      private long minSplitSize = 1;
085      private static final PathFilter hiddenFileFilter = new PathFilter(){
086          public boolean accept(Path p){
087            String name = p.getName(); 
088            return !name.startsWith("_") && !name.startsWith("."); 
089          }
090        }; 
091      protected void setMinSplitSize(long minSplitSize) {
092        this.minSplitSize = minSplitSize;
093      }
094    
095      /**
096       * Proxy PathFilter that accepts a path only if all filters given in the
097       * constructor do. Used by the listPaths() to apply the built-in
098       * hiddenFileFilter together with a user provided one (if any).
099       */
100      private static class MultiPathFilter implements PathFilter {
101        private List<PathFilter> filters;
102    
103        public MultiPathFilter(List<PathFilter> filters) {
104          this.filters = filters;
105        }
106    
107        public boolean accept(Path path) {
108          for (PathFilter filter : filters) {
109            if (!filter.accept(path)) {
110              return false;
111            }
112          }
113          return true;
114        }
115      }
116    
117      /**
118       * Is the given filename splitable? Usually, true, but if the file is
119       * stream compressed, it will not be.
120       * 
121       * <code>FileInputFormat</code> implementations can override this and return
122       * <code>false</code> to ensure that individual input files are never split-up
123       * so that {@link Mapper}s process entire files.
124       * 
125       * @param fs the file system that the file is on
126       * @param filename the file name to check
127       * @return is this file splitable?
128       */
129      protected boolean isSplitable(FileSystem fs, Path filename) {
130        return true;
131      }
132      
133      public abstract RecordReader<K, V> getRecordReader(InputSplit split,
134                                                   JobConf job,
135                                                   Reporter reporter)
136        throws IOException;
137    
138      /**
139       * Set a PathFilter to be applied to the input paths for the map-reduce job.
140       *
141       * @param filter the PathFilter class use for filtering the input paths.
142       */
143      public static void setInputPathFilter(JobConf conf,
144                                            Class<? extends PathFilter> filter) {
145        conf.setClass(org.apache.hadoop.mapreduce.lib.input.
146          FileInputFormat.PATHFILTER_CLASS, filter, PathFilter.class);
147      }
148    
149      /**
150       * Get a PathFilter instance of the filter set for the input paths.
151       *
152       * @return the PathFilter instance set for the job, NULL if none has been set.
153       */
154      public static PathFilter getInputPathFilter(JobConf conf) {
155        Class<? extends PathFilter> filterClass = conf.getClass(
156              org.apache.hadoop.mapreduce.lib.input.FileInputFormat.PATHFILTER_CLASS,
157              null, PathFilter.class);
158        return (filterClass != null) ?
159            ReflectionUtils.newInstance(filterClass, conf) : null;
160      }
161    
162      /**
163       * Add files in the input path recursively into the results.
164       * @param result
165       *          The List to store all files.
166       * @param fs
167       *          The FileSystem.
168       * @param path
169       *          The input path.
170       * @param inputFilter
171       *          The input filter that can be used to filter files/dirs. 
172       * @throws IOException
173       */
174      protected void addInputPathRecursively(List<FileStatus> result,
175          FileSystem fs, Path path, PathFilter inputFilter) 
176          throws IOException {
177        RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
178        while (iter.hasNext()) {
179          LocatedFileStatus stat = iter.next();
180          if (inputFilter.accept(stat.getPath())) {
181            if (stat.isDirectory()) {
182              addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
183            } else {
184              result.add(stat);
185            }
186          }
187        }
188      }
189      
190      /** List input directories.
191       * Subclasses may override to, e.g., select only files matching a regular
192       * expression. 
193       * 
194       * @param job the job to list input paths for
195       * @return array of FileStatus objects
196       * @throws IOException if zero items.
197       */
198      protected FileStatus[] listStatus(JobConf job) throws IOException {
199        Path[] dirs = getInputPaths(job);
200        if (dirs.length == 0) {
201          throw new IOException("No input paths specified in job");
202        }
203    
204        // get tokens for all the required FileSystems..
205        TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
206        
207        // Whether we need to recursive look into the directory structure
208        boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
209    
210        // creates a MultiPathFilter with the hiddenFileFilter and the
211        // user provided one (if any).
212        List<PathFilter> filters = new ArrayList<PathFilter>();
213        filters.add(hiddenFileFilter);
214        PathFilter jobFilter = getInputPathFilter(job);
215        if (jobFilter != null) {
216          filters.add(jobFilter);
217        }
218        PathFilter inputFilter = new MultiPathFilter(filters);
219    
220        FileStatus[] result;
221        int numThreads = job
222            .getInt(
223                org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
224                org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
225        
226        Stopwatch sw = new Stopwatch().start();
227        if (numThreads == 1) {
228          List<FileStatus> locatedFiles = singleThreadedListStatus(job, dirs, inputFilter, recursive); 
229          result = locatedFiles.toArray(new FileStatus[locatedFiles.size()]);
230        } else {
231          Iterable<FileStatus> locatedFiles = null;
232          try {
233            
234            LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
235                job, dirs, recursive, inputFilter, false);
236            locatedFiles = locatedFileStatusFetcher.getFileStatuses();
237          } catch (InterruptedException e) {
238            throw new IOException("Interrupted while getting file statuses");
239          }
240          result = Iterables.toArray(locatedFiles, FileStatus.class);
241        }
242    
243        sw.stop();
244        if (LOG.isDebugEnabled()) {
245          LOG.debug("Time taken to get FileStatuses: " + sw.elapsedMillis());
246        }
247        LOG.info("Total input paths to process : " + result.length);
248        return result;
249      }
250      
251      private List<FileStatus> singleThreadedListStatus(JobConf job, Path[] dirs,
252          PathFilter inputFilter, boolean recursive) throws IOException {
253        List<FileStatus> result = new ArrayList<FileStatus>();
254        List<IOException> errors = new ArrayList<IOException>();
255        for (Path p: dirs) {
256          FileSystem fs = p.getFileSystem(job); 
257          FileStatus[] matches = fs.globStatus(p, inputFilter);
258          if (matches == null) {
259            errors.add(new IOException("Input path does not exist: " + p));
260          } else if (matches.length == 0) {
261            errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
262          } else {
263            for (FileStatus globStat: matches) {
264              if (globStat.isDirectory()) {
265                RemoteIterator<LocatedFileStatus> iter =
266                    fs.listLocatedStatus(globStat.getPath());
267                while (iter.hasNext()) {
268                  LocatedFileStatus stat = iter.next();
269                  if (inputFilter.accept(stat.getPath())) {
270                    if (recursive && stat.isDirectory()) {
271                      addInputPathRecursively(result, fs, stat.getPath(),
272                          inputFilter);
273                    } else {
274                      result.add(stat);
275                    }
276                  }
277                }
278              } else {
279                result.add(globStat);
280              }
281            }
282          }
283        }
284        if (!errors.isEmpty()) {
285          throw new InvalidInputException(errors);
286        }
287        return result;
288      }
289    
290      /**
291       * A factory that makes the split for this class. It can be overridden
292       * by sub-classes to make sub-types
293       */
294      protected FileSplit makeSplit(Path file, long start, long length, 
295                                    String[] hosts) {
296        return new FileSplit(file, start, length, hosts);
297      }
298    
299      /** Splits files returned by {@link #listStatus(JobConf)} when
300       * they're too big.*/ 
301      public InputSplit[] getSplits(JobConf job, int numSplits)
302        throws IOException {
303        Stopwatch sw = new Stopwatch().start();
304        FileStatus[] files = listStatus(job);
305        
306        // Save the number of input files for metrics/loadgen
307        job.setLong(NUM_INPUT_FILES, files.length);
308        long totalSize = 0;                           // compute total size
309        for (FileStatus file: files) {                // check we have valid files
310          if (file.isDirectory()) {
311            throw new IOException("Not a file: "+ file.getPath());
312          }
313          totalSize += file.getLen();
314        }
315    
316        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
317        long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
318          FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
319    
320        // generate splits
321        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
322        NetworkTopology clusterMap = new NetworkTopology();
323        for (FileStatus file: files) {
324          Path path = file.getPath();
325          long length = file.getLen();
326          if (length != 0) {
327            FileSystem fs = path.getFileSystem(job);
328            BlockLocation[] blkLocations;
329            if (file instanceof LocatedFileStatus) {
330              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
331            } else {
332              blkLocations = fs.getFileBlockLocations(file, 0, length);
333            }
334            if (isSplitable(fs, path)) {
335              long blockSize = file.getBlockSize();
336              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
337    
338              long bytesRemaining = length;
339              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
340                String[] splitHosts = getSplitHosts(blkLocations,
341                    length-bytesRemaining, splitSize, clusterMap);
342                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
343                    splitHosts));
344                bytesRemaining -= splitSize;
345              }
346    
347              if (bytesRemaining != 0) {
348                String[] splitHosts = getSplitHosts(blkLocations, length
349                    - bytesRemaining, bytesRemaining, clusterMap);
350                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
351                    splitHosts));
352              }
353            } else {
354              String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
355              splits.add(makeSplit(path, 0, length, splitHosts));
356            }
357          } else { 
358            //Create empty hosts array for zero length files
359            splits.add(makeSplit(path, 0, length, new String[0]));
360          }
361        }
362        sw.stop();
363        if (LOG.isDebugEnabled()) {
364          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
365              + ", TimeTaken: " + sw.elapsedMillis());
366        }
367        return splits.toArray(new FileSplit[splits.size()]);
368      }
369    
370      protected long computeSplitSize(long goalSize, long minSize,
371                                           long blockSize) {
372        return Math.max(minSize, Math.min(goalSize, blockSize));
373      }
374    
375      protected int getBlockIndex(BlockLocation[] blkLocations, 
376                                  long offset) {
377        for (int i = 0 ; i < blkLocations.length; i++) {
378          // is the offset inside this block?
379          if ((blkLocations[i].getOffset() <= offset) &&
380              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
381            return i;
382          }
383        }
384        BlockLocation last = blkLocations[blkLocations.length -1];
385        long fileLength = last.getOffset() + last.getLength() -1;
386        throw new IllegalArgumentException("Offset " + offset + 
387                                           " is outside of file (0.." +
388                                           fileLength + ")");
389      }
390    
391      /**
392       * Sets the given comma separated paths as the list of inputs 
393       * for the map-reduce job.
394       * 
395       * @param conf Configuration of the job
396       * @param commaSeparatedPaths Comma separated paths to be set as 
397       *        the list of inputs for the map-reduce job.
398       */
399      public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
400        setInputPaths(conf, StringUtils.stringToPath(
401                            getPathStrings(commaSeparatedPaths)));
402      }
403    
404      /**
405       * Add the given comma separated paths to the list of inputs for
406       *  the map-reduce job.
407       * 
408       * @param conf The configuration of the job 
409       * @param commaSeparatedPaths Comma separated paths to be added to
410       *        the list of inputs for the map-reduce job.
411       */
412      public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
413        for (String str : getPathStrings(commaSeparatedPaths)) {
414          addInputPath(conf, new Path(str));
415        }
416      }
417    
418      /**
419       * Set the array of {@link Path}s as the list of inputs
420       * for the map-reduce job.
421       * 
422       * @param conf Configuration of the job. 
423       * @param inputPaths the {@link Path}s of the input directories/files 
424       * for the map-reduce job.
425       */ 
426      public static void setInputPaths(JobConf conf, Path... inputPaths) {
427        Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
428        StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
429        for(int i = 1; i < inputPaths.length;i++) {
430          str.append(StringUtils.COMMA_STR);
431          path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
432          str.append(StringUtils.escapeString(path.toString()));
433        }
434        conf.set(org.apache.hadoop.mapreduce.lib.input.
435          FileInputFormat.INPUT_DIR, str.toString());
436      }
437    
438      /**
439       * Add a {@link Path} to the list of inputs for the map-reduce job.
440       * 
441       * @param conf The configuration of the job 
442       * @param path {@link Path} to be added to the list of inputs for 
443       *            the map-reduce job.
444       */
445      public static void addInputPath(JobConf conf, Path path ) {
446        path = new Path(conf.getWorkingDirectory(), path);
447        String dirStr = StringUtils.escapeString(path.toString());
448        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
449          FileInputFormat.INPUT_DIR);
450        conf.set(org.apache.hadoop.mapreduce.lib.input.
451          FileInputFormat.INPUT_DIR, dirs == null ? dirStr :
452          dirs + StringUtils.COMMA_STR + dirStr);
453      }
454             
455      // This method escapes commas in the glob pattern of the given paths.
456      private static String[] getPathStrings(String commaSeparatedPaths) {
457        int length = commaSeparatedPaths.length();
458        int curlyOpen = 0;
459        int pathStart = 0;
460        boolean globPattern = false;
461        List<String> pathStrings = new ArrayList<String>();
462        
463        for (int i=0; i<length; i++) {
464          char ch = commaSeparatedPaths.charAt(i);
465          switch(ch) {
466            case '{' : {
467              curlyOpen++;
468              if (!globPattern) {
469                globPattern = true;
470              }
471              break;
472            }
473            case '}' : {
474              curlyOpen--;
475              if (curlyOpen == 0 && globPattern) {
476                globPattern = false;
477              }
478              break;
479            }
480            case ',' : {
481              if (!globPattern) {
482                pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
483                pathStart = i + 1 ;
484              }
485              break;
486            }
487            default:
488              continue; // nothing special to do for this character
489          }
490        }
491        pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
492        
493        return pathStrings.toArray(new String[0]);
494      }
495      
496      /**
497       * Get the list of input {@link Path}s for the map-reduce job.
498       * 
499       * @param conf The configuration of the job 
500       * @return the list of input {@link Path}s for the map-reduce job.
501       */
502      public static Path[] getInputPaths(JobConf conf) {
503        String dirs = conf.get(org.apache.hadoop.mapreduce.lib.input.
504          FileInputFormat.INPUT_DIR, "");
505        String [] list = StringUtils.split(dirs);
506        Path[] result = new Path[list.length];
507        for (int i = 0; i < list.length; i++) {
508          result[i] = new Path(StringUtils.unEscapeString(list[i]));
509        }
510        return result;
511      }
512      
513    
514      private void sortInDescendingOrder(List<NodeInfo> mylist) {
515        Collections.sort(mylist, new Comparator<NodeInfo> () {
516          public int compare(NodeInfo obj1, NodeInfo obj2) {
517    
518            if (obj1 == null || obj2 == null)
519              return -1;
520    
521            if (obj1.getValue() == obj2.getValue()) {
522              return 0;
523            }
524            else {
525              return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
526            }
527          }
528        }
529        );
530      }
531    
532      /** 
533       * This function identifies and returns the hosts that contribute 
534       * most for a given split. For calculating the contribution, rack
535       * locality is treated on par with host locality, so hosts from racks
536       * that contribute the most are preferred over hosts on racks that 
537       * contribute less
538       * @param blkLocations The list of block locations
539       * @param offset 
540       * @param splitSize 
541       * @return array of hosts that contribute most to this split
542       * @throws IOException
543       */
544      protected String[] getSplitHosts(BlockLocation[] blkLocations, 
545          long offset, long splitSize, NetworkTopology clusterMap)
546      throws IOException {
547    
548        int startIndex = getBlockIndex(blkLocations, offset);
549    
550        long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
551                              blkLocations[startIndex].getLength() - offset;
552    
553        //If this is the only block, just return
554        if (bytesInThisBlock >= splitSize) {
555          return blkLocations[startIndex].getHosts();
556        }
557    
558        long bytesInFirstBlock = bytesInThisBlock;
559        int index = startIndex + 1;
560        splitSize -= bytesInThisBlock;
561    
562        while (splitSize > 0) {
563          bytesInThisBlock =
564            Math.min(splitSize, blkLocations[index++].getLength());
565          splitSize -= bytesInThisBlock;
566        }
567    
568        long bytesInLastBlock = bytesInThisBlock;
569        int endIndex = index - 1;
570        
571        Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
572        Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
573        String [] allTopos = new String[0];
574    
575        // Build the hierarchy and aggregate the contribution of 
576        // bytes at each level. See TestGetSplitHosts.java 
577    
578        for (index = startIndex; index <= endIndex; index++) {
579    
580          // Establish the bytes in this block
581          if (index == startIndex) {
582            bytesInThisBlock = bytesInFirstBlock;
583          }
584          else if (index == endIndex) {
585            bytesInThisBlock = bytesInLastBlock;
586          }
587          else {
588            bytesInThisBlock = blkLocations[index].getLength();
589          }
590          
591          allTopos = blkLocations[index].getTopologyPaths();
592    
593          // If no topology information is available, just
594          // prefix a fakeRack
595          if (allTopos.length == 0) {
596            allTopos = fakeRacks(blkLocations, index);
597          }
598    
599          // NOTE: This code currently works only for one level of
600          // hierarchy (rack/host). However, it is relatively easy
601          // to extend this to support aggregation at different
602          // levels 
603          
604          for (String topo: allTopos) {
605    
606            Node node, parentNode;
607            NodeInfo nodeInfo, parentNodeInfo;
608    
609            node = clusterMap.getNode(topo);
610    
611            if (node == null) {
612              node = new NodeBase(topo);
613              clusterMap.add(node);
614            }
615            
616            nodeInfo = hostsMap.get(node);
617            
618            if (nodeInfo == null) {
619              nodeInfo = new NodeInfo(node);
620              hostsMap.put(node,nodeInfo);
621              parentNode = node.getParent();
622              parentNodeInfo = racksMap.get(parentNode);
623              if (parentNodeInfo == null) {
624                parentNodeInfo = new NodeInfo(parentNode);
625                racksMap.put(parentNode,parentNodeInfo);
626              }
627              parentNodeInfo.addLeaf(nodeInfo);
628            }
629            else {
630              nodeInfo = hostsMap.get(node);
631              parentNode = node.getParent();
632              parentNodeInfo = racksMap.get(parentNode);
633            }
634    
635            nodeInfo.addValue(index, bytesInThisBlock);
636            parentNodeInfo.addValue(index, bytesInThisBlock);
637    
638          } // for all topos
639        
640        } // for all indices
641    
642        return identifyHosts(allTopos.length, racksMap);
643      }
644      
645      private String[] identifyHosts(int replicationFactor, 
646                                     Map<Node,NodeInfo> racksMap) {
647        
648        String [] retVal = new String[replicationFactor];
649       
650        List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
651    
652        rackList.addAll(racksMap.values());
653        
654        // Sort the racks based on their contribution to this split
655        sortInDescendingOrder(rackList);
656        
657        boolean done = false;
658        int index = 0;
659        
660        // Get the host list for all our aggregated items, sort
661        // them and return the top entries
662        for (NodeInfo ni: rackList) {
663    
664          Set<NodeInfo> hostSet = ni.getLeaves();
665    
666          List<NodeInfo>hostList = new LinkedList<NodeInfo>();
667          hostList.addAll(hostSet);
668        
669          // Sort the hosts in this rack based on their contribution
670          sortInDescendingOrder(hostList);
671    
672          for (NodeInfo host: hostList) {
673            // Strip out the port number from the host name
674            retVal[index++] = host.node.getName().split(":")[0];
675            if (index == replicationFactor) {
676              done = true;
677              break;
678            }
679          }
680          
681          if (done == true) {
682            break;
683          }
684        }
685        return retVal;
686      }
687      
688      private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
689      throws IOException {
690        String[] allHosts = blkLocations[index].getHosts();
691        String[] allTopos = new String[allHosts.length];
692        for (int i = 0; i < allHosts.length; i++) {
693          allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
694        }
695        return allTopos;
696      }
697    
698    
699      private static class NodeInfo {
700        final Node node;
701        final Set<Integer> blockIds;
702        final Set<NodeInfo> leaves;
703    
704        private long value;
705        
706        NodeInfo(Node node) {
707          this.node = node;
708          blockIds = new HashSet<Integer>();
709          leaves = new HashSet<NodeInfo>();
710        }
711    
712        long getValue() {return value;}
713    
714        void addValue(int blockIndex, long value) {
715          if (blockIds.add(blockIndex) == true) {
716            this.value += value;
717          }
718        }
719    
720        Set<NodeInfo> getLeaves() { return leaves;}
721    
722        void addLeaf(NodeInfo nodeInfo) {
723          leaves.add(nodeInfo);
724        }
725      }
726    }