001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Collection;
024    import java.util.Collections;
025    import java.util.LinkedHashSet;
026    import java.util.HashSet;
027    import java.util.List;
028    import java.util.HashMap;
029    import java.util.Set;
030    import java.util.Iterator;
031    import java.util.Map;
032    import java.util.Map.Entry;
033    
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    import org.apache.hadoop.classification.InterfaceAudience;
037    import org.apache.hadoop.classification.InterfaceStability;
038    import org.apache.hadoop.conf.Configuration;
039    import org.apache.hadoop.fs.FileSystem;
040    import org.apache.hadoop.fs.LocatedFileStatus;
041    import org.apache.hadoop.fs.Path;
042    import org.apache.hadoop.fs.BlockLocation;
043    import org.apache.hadoop.fs.FileStatus;
044    import org.apache.hadoop.fs.PathFilter;
045    import org.apache.hadoop.io.compress.CompressionCodec;
046    import org.apache.hadoop.io.compress.CompressionCodecFactory;
047    import org.apache.hadoop.io.compress.SplittableCompressionCodec;
048    import org.apache.hadoop.mapreduce.InputFormat;
049    import org.apache.hadoop.mapreduce.InputSplit;
050    import org.apache.hadoop.mapreduce.JobContext;
051    import org.apache.hadoop.mapreduce.RecordReader;
052    import org.apache.hadoop.mapreduce.TaskAttemptContext;
053    import org.apache.hadoop.net.NodeBase;
054    import org.apache.hadoop.net.NetworkTopology;
055    
056    import com.google.common.annotations.VisibleForTesting;
057    import com.google.common.collect.HashMultiset;
058    import com.google.common.collect.Multiset;
059    
060    /**
061     * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
062     * {@link InputFormat#getSplits(JobContext)} method. 
063     * 
064     * Splits are constructed from the files under the input paths. 
065     * A split cannot have files from different pools.
066     * Each split returned may contain blocks from different files.
067     * If a maxSplitSize is specified, then blocks on the same node are
068     * combined to form a single split. Blocks that are left over are
069     * then combined with other blocks in the same rack. 
070     * If maxSplitSize is not specified, then blocks from the same rack
071     * are combined in a single split; no attempt is made to create
072     * node-local splits.
073     * If the maxSplitSize is equal to the block size, then this class
074     * is similar to the default splitting behavior in Hadoop: each
075     * block is a locally processed split.
076     * Subclasses implement 
077     * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
078     * to construct <code>RecordReader</code>'s for 
079     * <code>CombineFileSplit</code>'s.
080     * 
081     * @see CombineFileSplit
082     */
083    @InterfaceAudience.Public
084    @InterfaceStability.Stable
085    public abstract class CombineFileInputFormat<K, V>
086      extends FileInputFormat<K, V> {
087      
088      private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
089      
090      public static final String SPLIT_MINSIZE_PERNODE = 
091        "mapreduce.input.fileinputformat.split.minsize.per.node";
092      public static final String SPLIT_MINSIZE_PERRACK = 
093        "mapreduce.input.fileinputformat.split.minsize.per.rack";
094      // ability to limit the size of a single split
095      private long maxSplitSize = 0;
096      private long minSplitSizeNode = 0;
097      private long minSplitSizeRack = 0;
098    
099      // A pool of input paths filters. A split cannot have blocks from files
100      // across multiple pools.
101      private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
102    
103      // mapping from a rack name to the set of Nodes in the rack 
104      private HashMap<String, Set<String>> rackToNodes = 
105                                new HashMap<String, Set<String>>();
106      /**
107       * Specify the maximum size (in bytes) of each split. Each split is
108       * approximately equal to the specified size.
109       */
110      protected void setMaxSplitSize(long maxSplitSize) {
111        this.maxSplitSize = maxSplitSize;
112      }
113    
114      /**
115       * Specify the minimum size (in bytes) of each split per node.
116       * This applies to data that is left over after combining data on a single
117       * node into splits that are of maximum size specified by maxSplitSize.
118       * This leftover data will be combined into its own split if its size
119       * exceeds minSplitSizeNode.
120       */
121      protected void setMinSplitSizeNode(long minSplitSizeNode) {
122        this.minSplitSizeNode = minSplitSizeNode;
123      }
124    
125      /**
126       * Specify the minimum size (in bytes) of each split per rack.
127       * This applies to data that is left over after combining data on a single
128       * rack into splits that are of maximum size specified by maxSplitSize.
129       * This leftover data will be combined into its own split if its size
130       * exceeds minSplitSizeRack.
131       */
132      protected void setMinSplitSizeRack(long minSplitSizeRack) {
133        this.minSplitSizeRack = minSplitSizeRack;
134      }
135    
136      /**
137       * Create a new pool and add the filters to it.
138       * A split cannot have files from different pools.
139       */
140      protected void createPool(List<PathFilter> filters) {
141        pools.add(new MultiPathFilter(filters));
142      }
143    
144      /**
145       * Create a new pool and add the filters to it. 
146       * A pathname can satisfy any one of the specified filters.
147       * A split cannot have files from different pools.
148       */
149      protected void createPool(PathFilter... filters) {
150        MultiPathFilter multi = new MultiPathFilter();
151        for (PathFilter f: filters) {
152          multi.add(f);
153        }
154        pools.add(multi);
155      }
156      
157      @Override
158      protected boolean isSplitable(JobContext context, Path file) {
159        final CompressionCodec codec =
160          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
161        if (null == codec) {
162          return true;
163        }
164        return codec instanceof SplittableCompressionCodec;
165      }
166    
167      /**
168       * default constructor
169       */
170      public CombineFileInputFormat() {
171      }
172    
173      @Override
174      public List<InputSplit> getSplits(JobContext job) 
175        throws IOException {
176        long minSizeNode = 0;
177        long minSizeRack = 0;
178        long maxSize = 0;
179        Configuration conf = job.getConfiguration();
180    
181        // the values specified by setxxxSplitSize() takes precedence over the
182        // values that might have been specified in the config
183        if (minSplitSizeNode != 0) {
184          minSizeNode = minSplitSizeNode;
185        } else {
186          minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
187        }
188        if (minSplitSizeRack != 0) {
189          minSizeRack = minSplitSizeRack;
190        } else {
191          minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
192        }
193        if (maxSplitSize != 0) {
194          maxSize = maxSplitSize;
195        } else {
196          maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
197          // If maxSize is not configured, a single split will be generated per
198          // node.
199        }
200        if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
201          throw new IOException("Minimum split size pernode " + minSizeNode +
202                                " cannot be larger than maximum split size " +
203                                maxSize);
204        }
205        if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
206          throw new IOException("Minimum split size per rack " + minSizeRack +
207                                " cannot be larger than maximum split size " +
208                                maxSize);
209        }
210        if (minSizeRack != 0 && minSizeNode > minSizeRack) {
211          throw new IOException("Minimum split size per node " + minSizeNode +
212                                " cannot be larger than minimum split " +
213                                "size per rack " + minSizeRack);
214        }
215    
216        // all the files in input set
217        List<FileStatus> stats = listStatus(job);
218        List<InputSplit> splits = new ArrayList<InputSplit>();
219        if (stats.size() == 0) {
220          return splits;    
221        }
222    
223        // In one single iteration, process all the paths in a single pool.
224        // Processing one pool at a time ensures that a split contains paths
225        // from a single pool only.
226        for (MultiPathFilter onepool : pools) {
227          ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
228          
229          // pick one input path. If it matches all the filters in a pool,
230          // add it to the output set
231          for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
232            FileStatus p = iter.next();
233            if (onepool.accept(p.getPath())) {
234              myPaths.add(p); // add it to my output set
235              iter.remove();
236            }
237          }
238          // create splits for all files in this pool.
239          getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
240        }
241    
242        // create splits for all files that are not in any pool.
243        getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
244    
245        // free up rackToNodes map
246        rackToNodes.clear();
247        return splits;    
248      }
249    
250      /**
251       * Return all the splits in the specified set of paths
252       */
253      private void getMoreSplits(JobContext job, List<FileStatus> stats,
254                                 long maxSize, long minSizeNode, long minSizeRack,
255                                 List<InputSplit> splits)
256        throws IOException {
257        Configuration conf = job.getConfiguration();
258    
259        // all blocks for all the files in input set
260        OneFileInfo[] files;
261      
262        // mapping from a rack name to the list of blocks it has
263        HashMap<String, List<OneBlockInfo>> rackToBlocks = 
264                                  new HashMap<String, List<OneBlockInfo>>();
265    
266        // mapping from a block to the nodes on which it has replicas
267        HashMap<OneBlockInfo, String[]> blockToNodes = 
268                                  new HashMap<OneBlockInfo, String[]>();
269    
270        // mapping from a node to the list of blocks that it contains
271        HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
272                                  new HashMap<String, Set<OneBlockInfo>>();
273        
274        files = new OneFileInfo[stats.size()];
275        if (stats.size() == 0) {
276          return; 
277        }
278    
279        // populate all the blocks for all files
280        long totLength = 0;
281        int i = 0;
282        for (FileStatus stat : stats) {
283          files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
284                                     rackToBlocks, blockToNodes, nodeToBlocks,
285                                     rackToNodes, maxSize);
286          totLength += files[i].getLength();
287        }
288        createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
289                     maxSize, minSizeNode, minSizeRack, splits);
290      }
291    
292      @VisibleForTesting
293      void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
294                         Map<OneBlockInfo, String[]> blockToNodes,
295                         Map<String, List<OneBlockInfo>> rackToBlocks,
296                         long totLength,
297                         long maxSize,
298                         long minSizeNode,
299                         long minSizeRack,
300                         List<InputSplit> splits                     
301                        ) {
302        ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
303        long curSplitSize = 0;
304        
305        int totalNodes = nodeToBlocks.size();
306        long totalLength = totLength;
307    
308        Multiset<String> splitsPerNode = HashMultiset.create();
309        Set<String> completedNodes = new HashSet<String>();
310        
311        while(true) {
312          // it is allowed for maxSize to be 0. Disable smoothing load for such cases
313    
314          // process all nodes and create splits that are local to a node. Generate
315          // one split per node iteration, and walk over nodes multiple times to
316          // distribute the splits across nodes. 
317          for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
318              .entrySet().iterator(); iter.hasNext();) {
319            Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
320            
321            String node = one.getKey();
322            
323            // Skip the node if it has previously been marked as completed.
324            if (completedNodes.contains(node)) {
325              continue;
326            }
327    
328            Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
329    
330            // for each block, copy it into validBlocks. Delete it from
331            // blockToNodes so that the same block does not appear in
332            // two different splits.
333            Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
334            while (oneBlockIter.hasNext()) {
335              OneBlockInfo oneblock = oneBlockIter.next();
336              
337              // Remove all blocks which may already have been assigned to other
338              // splits.
339              if(!blockToNodes.containsKey(oneblock)) {
340                oneBlockIter.remove();
341                continue;
342              }
343            
344              validBlocks.add(oneblock);
345              blockToNodes.remove(oneblock);
346              curSplitSize += oneblock.length;
347    
348              // if the accumulated split size exceeds the maximum, then
349              // create this split.
350              if (maxSize != 0 && curSplitSize >= maxSize) {
351                // create an input split and add it to the splits array
352                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
353                totalLength -= curSplitSize;
354                curSplitSize = 0;
355    
356                splitsPerNode.add(node);
357    
358                // Remove entries from blocksInNode so that we don't walk these
359                // again.
360                blocksInCurrentNode.removeAll(validBlocks);
361                validBlocks.clear();
362    
363                // Done creating a single split for this node. Move on to the next
364                // node so that splits are distributed across nodes.
365                break;
366              }
367    
368            }
369            if (validBlocks.size() != 0) {
370              // This implies that the last few blocks (or all in case maxSize=0)
371              // were not part of a split. The node is complete.
372              
373              // if there were any blocks left over and their combined size is
374              // larger than minSplitNode, then combine them into one split.
375              // Otherwise add them back to the unprocessed pool. It is likely
376              // that they will be combined with other blocks from the
377              // same rack later on.
378              // This condition also kicks in when max split size is not set. All
379              // blocks on a node will be grouped together into a single split.
380              if (minSizeNode != 0 && curSplitSize >= minSizeNode
381                  && splitsPerNode.count(node) == 0) {
382                // haven't created any split on this machine. so its ok to add a
383                // smaller one for parallelism. Otherwise group it in the rack for
384                // balanced size create an input split and add it to the splits
385                // array
386                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
387                totalLength -= curSplitSize;
388                splitsPerNode.add(node);
389                // Remove entries from blocksInNode so that we don't walk this again.
390                blocksInCurrentNode.removeAll(validBlocks);
391                // The node is done. This was the last set of blocks for this node.
392              } else {
393                // Put the unplaced blocks back into the pool for later rack-allocation.
394                for (OneBlockInfo oneblock : validBlocks) {
395                  blockToNodes.put(oneblock, oneblock.hosts);
396                }
397              }
398              validBlocks.clear();
399              curSplitSize = 0;
400              completedNodes.add(node);
401            } else { // No in-flight blocks.
402              if (blocksInCurrentNode.size() == 0) {
403                // Node is done. All blocks were fit into node-local splits.
404                completedNodes.add(node);
405              } // else Run through the node again.
406            }
407          }
408    
409          // Check if node-local assignments are complete.
410          if (completedNodes.size() == totalNodes || totalLength == 0) {
411            // All nodes have been walked over and marked as completed or all blocks
412            // have been assigned. The rest should be handled via rackLock assignment.
413            LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
414                + completedNodes.size() + ", size left: " + totalLength);
415            break;
416          }
417        }
418    
419        // if blocks in a rack are below the specified minimum size, then keep them
420        // in 'overflow'. After the processing of all racks is complete, these 
421        // overflow blocks will be combined into splits.
422        ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
423        Set<String> racks = new HashSet<String>();
424    
425        // Process all racks over and over again until there is no more work to do.
426        while (blockToNodes.size() > 0) {
427    
428          // Create one split for this rack before moving over to the next rack. 
429          // Come back to this rack after creating a single split for each of the 
430          // remaining racks.
431          // Process one rack location at a time, Combine all possible blocks that
432          // reside on this rack as one split. (constrained by minimum and maximum
433          // split size).
434    
435          // iterate over all racks 
436          for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
437               rackToBlocks.entrySet().iterator(); iter.hasNext();) {
438    
439            Map.Entry<String, List<OneBlockInfo>> one = iter.next();
440            racks.add(one.getKey());
441            List<OneBlockInfo> blocks = one.getValue();
442    
443            // for each block, copy it into validBlocks. Delete it from 
444            // blockToNodes so that the same block does not appear in 
445            // two different splits.
446            boolean createdSplit = false;
447            for (OneBlockInfo oneblock : blocks) {
448              if (blockToNodes.containsKey(oneblock)) {
449                validBlocks.add(oneblock);
450                blockToNodes.remove(oneblock);
451                curSplitSize += oneblock.length;
452          
453                // if the accumulated split size exceeds the maximum, then 
454                // create this split.
455                if (maxSize != 0 && curSplitSize >= maxSize) {
456                  // create an input split and add it to the splits array
457                  addCreatedSplit(splits, getHosts(racks), validBlocks);
458                  createdSplit = true;
459                  break;
460                }
461              }
462            }
463    
464            // if we created a split, then just go to the next rack
465            if (createdSplit) {
466              curSplitSize = 0;
467              validBlocks.clear();
468              racks.clear();
469              continue;
470            }
471    
472            if (!validBlocks.isEmpty()) {
473              if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
474                // if there is a minimum size specified, then create a single split
475                // otherwise, store these blocks into overflow data structure
476                addCreatedSplit(splits, getHosts(racks), validBlocks);
477              } else {
478                // There were a few blocks in this rack that 
479                    // remained to be processed. Keep them in 'overflow' block list. 
480                    // These will be combined later.
481                overflowBlocks.addAll(validBlocks);
482              }
483            }
484            curSplitSize = 0;
485            validBlocks.clear();
486            racks.clear();
487          }
488        }
489    
490        assert blockToNodes.isEmpty();
491        assert curSplitSize == 0;
492        assert validBlocks.isEmpty();
493        assert racks.isEmpty();
494    
495        // Process all overflow blocks
496        for (OneBlockInfo oneblock : overflowBlocks) {
497          validBlocks.add(oneblock);
498          curSplitSize += oneblock.length;
499    
500          // This might cause an exiting rack location to be re-added,
501          // but it should be ok.
502          for (int i = 0; i < oneblock.racks.length; i++) {
503            racks.add(oneblock.racks[i]);
504          }
505    
506          // if the accumulated split size exceeds the maximum, then 
507          // create this split.
508          if (maxSize != 0 && curSplitSize >= maxSize) {
509            // create an input split and add it to the splits array
510            addCreatedSplit(splits, getHosts(racks), validBlocks);
511            curSplitSize = 0;
512            validBlocks.clear();
513            racks.clear();
514          }
515        }
516    
517        // Process any remaining blocks, if any.
518        if (!validBlocks.isEmpty()) {
519          addCreatedSplit(splits, getHosts(racks), validBlocks);
520        }
521      }
522    
523      /**
524       * Create a single split from the list of blocks specified in validBlocks
525       * Add this new split into splitList.
526       */
527      private void addCreatedSplit(List<InputSplit> splitList, 
528                                   Collection<String> locations, 
529                                   ArrayList<OneBlockInfo> validBlocks) {
530        // create an input split
531        Path[] fl = new Path[validBlocks.size()];
532        long[] offset = new long[validBlocks.size()];
533        long[] length = new long[validBlocks.size()];
534        for (int i = 0; i < validBlocks.size(); i++) {
535          fl[i] = validBlocks.get(i).onepath; 
536          offset[i] = validBlocks.get(i).offset;
537          length[i] = validBlocks.get(i).length;
538        }
539         // add this split to the list that is returned
540        CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
541                                       length, locations.toArray(new String[0]));
542        splitList.add(thissplit); 
543      }
544    
545      /**
546       * This is not implemented yet. 
547       */
548      public abstract RecordReader<K, V> createRecordReader(InputSplit split,
549          TaskAttemptContext context) throws IOException;
550    
551      /**
552       * information about one file from the File System
553       */
554      @VisibleForTesting
555      static class OneFileInfo {
556        private long fileSize;               // size of the file
557        private OneBlockInfo[] blocks;       // all blocks in this file
558    
559        OneFileInfo(FileStatus stat, Configuration conf,
560                    boolean isSplitable,
561                    HashMap<String, List<OneBlockInfo>> rackToBlocks,
562                    HashMap<OneBlockInfo, String[]> blockToNodes,
563                    HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
564                    HashMap<String, Set<String>> rackToNodes,
565                    long maxSize)
566                    throws IOException {
567          this.fileSize = 0;
568    
569          // get block locations from file system
570          BlockLocation[] locations;
571          if (stat instanceof LocatedFileStatus) {
572            locations = ((LocatedFileStatus) stat).getBlockLocations();
573          } else {
574            FileSystem fs = stat.getPath().getFileSystem(conf);
575            locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
576          }
577          // create a list of all block and their locations
578          if (locations == null) {
579            blocks = new OneBlockInfo[0];
580          } else {
581    
582            if(locations.length == 0 && !stat.isDirectory()) {
583              locations = new BlockLocation[] { new BlockLocation() };
584            }
585    
586            if (!isSplitable) {
587              // if the file is not splitable, just create the one block with
588              // full file length
589              blocks = new OneBlockInfo[1];
590              fileSize = stat.getLen();
591              blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
592                  locations[0].getHosts(), locations[0].getTopologyPaths());
593            } else {
594              ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
595                  locations.length);
596              for (int i = 0; i < locations.length; i++) {
597                fileSize += locations[i].getLength();
598    
599                // each split can be a maximum of maxSize
600                long left = locations[i].getLength();
601                long myOffset = locations[i].getOffset();
602                long myLength = 0;
603                do {
604                  if (maxSize == 0) {
605                    myLength = left;
606                  } else {
607                    if (left > maxSize && left < 2 * maxSize) {
608                      // if remainder is between max and 2*max - then
609                      // instead of creating splits of size max, left-max we
610                      // create splits of size left/2 and left/2. This is
611                      // a heuristic to avoid creating really really small
612                      // splits.
613                      myLength = left / 2;
614                    } else {
615                      myLength = Math.min(maxSize, left);
616                    }
617                  }
618                  OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
619                      myOffset, myLength, locations[i].getHosts(),
620                      locations[i].getTopologyPaths());
621                  left -= myLength;
622                  myOffset += myLength;
623    
624                  blocksList.add(oneblock);
625                } while (left > 0);
626              }
627              blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
628            }
629            
630            populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
631                              nodeToBlocks, rackToNodes);
632          }
633        }
634        
635        @VisibleForTesting
636        static void populateBlockInfo(OneBlockInfo[] blocks,
637                              Map<String, List<OneBlockInfo>> rackToBlocks,
638                              Map<OneBlockInfo, String[]> blockToNodes,
639                              Map<String, Set<OneBlockInfo>> nodeToBlocks,
640                              Map<String, Set<String>> rackToNodes) {
641          for (OneBlockInfo oneblock : blocks) {
642            // add this block to the block --> node locations map
643            blockToNodes.put(oneblock, oneblock.hosts);
644    
645            // For blocks that do not have host/rack information,
646            // assign to default  rack.
647            String[] racks = null;
648            if (oneblock.hosts.length == 0) {
649              racks = new String[]{NetworkTopology.DEFAULT_RACK};
650            } else {
651              racks = oneblock.racks;
652            }
653    
654            // add this block to the rack --> block map
655            for (int j = 0; j < racks.length; j++) {
656              String rack = racks[j];
657              List<OneBlockInfo> blklist = rackToBlocks.get(rack);
658              if (blklist == null) {
659                blklist = new ArrayList<OneBlockInfo>();
660                rackToBlocks.put(rack, blklist);
661              }
662              blklist.add(oneblock);
663              if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
664                // Add this host to rackToNodes map
665                addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
666              }
667            }
668    
669            // add this block to the node --> block map
670            for (int j = 0; j < oneblock.hosts.length; j++) {
671              String node = oneblock.hosts[j];
672              Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
673              if (blklist == null) {
674                blklist = new LinkedHashSet<OneBlockInfo>();
675                nodeToBlocks.put(node, blklist);
676              }
677              blklist.add(oneblock);
678            }
679          }
680        }
681    
682        long getLength() {
683          return fileSize;
684        }
685    
686        OneBlockInfo[] getBlocks() {
687          return blocks;
688        }
689      }
690    
691      /**
692       * information about one block from the File System
693       */
694      @VisibleForTesting
695      static class OneBlockInfo {
696        Path onepath;                // name of this file
697        long offset;                 // offset in file
698        long length;                 // length of this block
699        String[] hosts;              // nodes on which this block resides
700        String[] racks;              // network topology of hosts
701    
702        OneBlockInfo(Path path, long offset, long len, 
703                     String[] hosts, String[] topologyPaths) {
704          this.onepath = path;
705          this.offset = offset;
706          this.hosts = hosts;
707          this.length = len;
708          assert (hosts.length == topologyPaths.length ||
709                  topologyPaths.length == 0);
710    
711          // if the file system does not have any rack information, then
712          // use dummy rack location.
713          if (topologyPaths.length == 0) {
714            topologyPaths = new String[hosts.length];
715            for (int i = 0; i < topologyPaths.length; i++) {
716              topologyPaths[i] = (new NodeBase(hosts[i], 
717                                  NetworkTopology.DEFAULT_RACK)).toString();
718            }
719          }
720    
721          // The topology paths have the host name included as the last 
722          // component. Strip it.
723          this.racks = new String[topologyPaths.length];
724          for (int i = 0; i < topologyPaths.length; i++) {
725            this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
726          }
727        }
728      }
729    
730      protected BlockLocation[] getFileBlockLocations(
731        FileSystem fs, FileStatus stat) throws IOException {
732        if (stat instanceof LocatedFileStatus) {
733          return ((LocatedFileStatus) stat).getBlockLocations();
734        }
735        return fs.getFileBlockLocations(stat, 0, stat.getLen());
736      }
737    
738      private static void addHostToRack(Map<String, Set<String>> rackToNodes,
739                                        String rack, String host) {
740        Set<String> hosts = rackToNodes.get(rack);
741        if (hosts == null) {
742          hosts = new HashSet<String>();
743          rackToNodes.put(rack, hosts);
744        }
745        hosts.add(host);
746      }
747      
748      private Set<String> getHosts(Set<String> racks) {
749        Set<String> hosts = new HashSet<String>();
750        for (String rack : racks) {
751          if (rackToNodes.containsKey(rack)) {
752            hosts.addAll(rackToNodes.get(rack));
753          }
754        }
755        return hosts;
756      }
757      
758      /**
759       * Accept a path only if any one of filters given in the
760       * constructor do. 
761       */
762      private static class MultiPathFilter implements PathFilter {
763        private List<PathFilter> filters;
764    
765        public MultiPathFilter() {
766          this.filters = new ArrayList<PathFilter>();
767        }
768    
769        public MultiPathFilter(List<PathFilter> filters) {
770          this.filters = filters;
771        }
772    
773        public void add(PathFilter one) {
774          filters.add(one);
775        }
776    
777        public boolean accept(Path path) {
778          for (PathFilter filter : filters) {
779            if (filter.accept(path)) {
780              return true;
781            }
782          }
783          return false;
784        }
785    
786        public String toString() {
787          StringBuffer buf = new StringBuffer();
788          buf.append("[");
789          for (PathFilter f: filters) {
790            buf.append(f);
791            buf.append(",");
792          }
793          buf.append("]");
794          return buf.toString();
795        }
796      }
797    }