001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.LinkedHashSet;
026import java.util.HashSet;
027import java.util.List;
028import java.util.HashMap;
029import java.util.Set;
030import java.util.Iterator;
031import java.util.Map;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.LocatedFileStatus;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.BlockLocation;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.PathFilter;
044import org.apache.hadoop.io.compress.CompressionCodec;
045import org.apache.hadoop.io.compress.CompressionCodecFactory;
046import org.apache.hadoop.io.compress.SplittableCompressionCodec;
047import org.apache.hadoop.mapreduce.InputFormat;
048import org.apache.hadoop.mapreduce.InputSplit;
049import org.apache.hadoop.mapreduce.JobContext;
050import org.apache.hadoop.mapreduce.RecordReader;
051import org.apache.hadoop.mapreduce.TaskAttemptContext;
052import org.apache.hadoop.net.NodeBase;
053import org.apache.hadoop.net.NetworkTopology;
054
055import com.google.common.annotations.VisibleForTesting;
056import com.google.common.collect.HashMultiset;
057import com.google.common.collect.Multiset;
058
059/**
060 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
061 * {@link InputFormat#getSplits(JobContext)} method. 
062 * 
063 * Splits are constructed from the files under the input paths. 
064 * A split cannot have files from different pools.
065 * Each split returned may contain blocks from different files.
066 * If a maxSplitSize is specified, then blocks on the same node are
067 * combined to form a single split. Blocks that are left over are
068 * then combined with other blocks in the same rack. 
069 * If maxSplitSize is not specified, then blocks from the same rack
070 * are combined in a single split; no attempt is made to create
071 * node-local splits.
072 * If the maxSplitSize is equal to the block size, then this class
073 * is similar to the default splitting behavior in Hadoop: each
074 * block is a locally processed split.
075 * Subclasses implement 
076 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
077 * to construct <code>RecordReader</code>'s for 
078 * <code>CombineFileSplit</code>'s.
079 * 
080 * @see CombineFileSplit
081 */
082@InterfaceAudience.Public
083@InterfaceStability.Stable
084public abstract class CombineFileInputFormat<K, V>
085  extends FileInputFormat<K, V> {
086  
087  private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
088  
089  public static final String SPLIT_MINSIZE_PERNODE = 
090    "mapreduce.input.fileinputformat.split.minsize.per.node";
091  public static final String SPLIT_MINSIZE_PERRACK = 
092    "mapreduce.input.fileinputformat.split.minsize.per.rack";
093  // ability to limit the size of a single split
094  private long maxSplitSize = 0;
095  private long minSplitSizeNode = 0;
096  private long minSplitSizeRack = 0;
097
098  // A pool of input paths filters. A split cannot have blocks from files
099  // across multiple pools.
100  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
101
102  // mapping from a rack name to the set of Nodes in the rack 
103  private HashMap<String, Set<String>> rackToNodes = 
104                            new HashMap<String, Set<String>>();
105  /**
106   * Specify the maximum size (in bytes) of each split. Each split is
107   * approximately equal to the specified size.
108   */
109  protected void setMaxSplitSize(long maxSplitSize) {
110    this.maxSplitSize = maxSplitSize;
111  }
112
113  /**
114   * Specify the minimum size (in bytes) of each split per node.
115   * This applies to data that is left over after combining data on a single
116   * node into splits that are of maximum size specified by maxSplitSize.
117   * This leftover data will be combined into its own split if its size
118   * exceeds minSplitSizeNode.
119   */
120  protected void setMinSplitSizeNode(long minSplitSizeNode) {
121    this.minSplitSizeNode = minSplitSizeNode;
122  }
123
124  /**
125   * Specify the minimum size (in bytes) of each split per rack.
126   * This applies to data that is left over after combining data on a single
127   * rack into splits that are of maximum size specified by maxSplitSize.
128   * This leftover data will be combined into its own split if its size
129   * exceeds minSplitSizeRack.
130   */
131  protected void setMinSplitSizeRack(long minSplitSizeRack) {
132    this.minSplitSizeRack = minSplitSizeRack;
133  }
134
135  /**
136   * Create a new pool and add the filters to it.
137   * A split cannot have files from different pools.
138   */
139  protected void createPool(List<PathFilter> filters) {
140    pools.add(new MultiPathFilter(filters));
141  }
142
143  /**
144   * Create a new pool and add the filters to it. 
145   * A pathname can satisfy any one of the specified filters.
146   * A split cannot have files from different pools.
147   */
148  protected void createPool(PathFilter... filters) {
149    MultiPathFilter multi = new MultiPathFilter();
150    for (PathFilter f: filters) {
151      multi.add(f);
152    }
153    pools.add(multi);
154  }
155  
156  @Override
157  protected boolean isSplitable(JobContext context, Path file) {
158    final CompressionCodec codec =
159      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
160    if (null == codec) {
161      return true;
162    }
163    return codec instanceof SplittableCompressionCodec;
164  }
165
166  /**
167   * default constructor
168   */
169  public CombineFileInputFormat() {
170  }
171
172  @Override
173  public List<InputSplit> getSplits(JobContext job) 
174    throws IOException {
175    long minSizeNode = 0;
176    long minSizeRack = 0;
177    long maxSize = 0;
178    Configuration conf = job.getConfiguration();
179
180    // the values specified by setxxxSplitSize() takes precedence over the
181    // values that might have been specified in the config
182    if (minSplitSizeNode != 0) {
183      minSizeNode = minSplitSizeNode;
184    } else {
185      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
186    }
187    if (minSplitSizeRack != 0) {
188      minSizeRack = minSplitSizeRack;
189    } else {
190      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
191    }
192    if (maxSplitSize != 0) {
193      maxSize = maxSplitSize;
194    } else {
195      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
196      // If maxSize is not configured, a single split will be generated per
197      // node.
198    }
199    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
200      throw new IOException("Minimum split size pernode " + minSizeNode +
201                            " cannot be larger than maximum split size " +
202                            maxSize);
203    }
204    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
205      throw new IOException("Minimum split size per rack " + minSizeRack +
206                            " cannot be larger than maximum split size " +
207                            maxSize);
208    }
209    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
210      throw new IOException("Minimum split size per node " + minSizeNode +
211                            " cannot be larger than minimum split " +
212                            "size per rack " + minSizeRack);
213    }
214
215    // all the files in input set
216    List<FileStatus> stats = listStatus(job);
217    List<InputSplit> splits = new ArrayList<InputSplit>();
218    if (stats.size() == 0) {
219      return splits;    
220    }
221
222    // In one single iteration, process all the paths in a single pool.
223    // Processing one pool at a time ensures that a split contains paths
224    // from a single pool only.
225    for (MultiPathFilter onepool : pools) {
226      ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
227      
228      // pick one input path. If it matches all the filters in a pool,
229      // add it to the output set
230      for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
231        FileStatus p = iter.next();
232        if (onepool.accept(p.getPath())) {
233          myPaths.add(p); // add it to my output set
234          iter.remove();
235        }
236      }
237      // create splits for all files in this pool.
238      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
239    }
240
241    // create splits for all files that are not in any pool.
242    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
243
244    // free up rackToNodes map
245    rackToNodes.clear();
246    return splits;    
247  }
248
249  /**
250   * Return all the splits in the specified set of paths
251   */
252  private void getMoreSplits(JobContext job, List<FileStatus> stats,
253                             long maxSize, long minSizeNode, long minSizeRack,
254                             List<InputSplit> splits)
255    throws IOException {
256    Configuration conf = job.getConfiguration();
257
258    // all blocks for all the files in input set
259    OneFileInfo[] files;
260  
261    // mapping from a rack name to the list of blocks it has
262    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
263                              new HashMap<String, List<OneBlockInfo>>();
264
265    // mapping from a block to the nodes on which it has replicas
266    HashMap<OneBlockInfo, String[]> blockToNodes = 
267                              new HashMap<OneBlockInfo, String[]>();
268
269    // mapping from a node to the list of blocks that it contains
270    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
271                              new HashMap<String, Set<OneBlockInfo>>();
272    
273    files = new OneFileInfo[stats.size()];
274    if (stats.size() == 0) {
275      return; 
276    }
277
278    // populate all the blocks for all files
279    long totLength = 0;
280    int i = 0;
281    for (FileStatus stat : stats) {
282      files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
283                                 rackToBlocks, blockToNodes, nodeToBlocks,
284                                 rackToNodes, maxSize);
285      totLength += files[i].getLength();
286    }
287    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
288                 maxSize, minSizeNode, minSizeRack, splits);
289  }
290
291  /**
292   * Process all the nodes and create splits that are local to a node.
293   * Generate one split per node iteration, and walk over nodes multiple times
294   * to distribute the splits across nodes.
295   * <p>
296   * Note: The order of processing the nodes is undetermined because the
297   * implementation of nodeToBlocks is {@link java.util.HashMap} and its order
298   * of the entries is undetermined.
299   * @param nodeToBlocks Mapping from a node to the list of blocks that
300   *                     it contains.
301   * @param blockToNodes Mapping from a block to the nodes on which
302   *                     it has replicas.
303   * @param rackToBlocks Mapping from a rack name to the list of blocks it has.
304   * @param totLength Total length of the input files.
305   * @param maxSize Max size of each split.
306   *                If set to 0, disable smoothing load.
307   * @param minSizeNode Minimum split size per node.
308   * @param minSizeRack Minimum split size per rack.
309   * @param splits New splits created by this method are added to the list.
310   */
311  @VisibleForTesting
312  void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
313                     Map<OneBlockInfo, String[]> blockToNodes,
314                     Map<String, List<OneBlockInfo>> rackToBlocks,
315                     long totLength,
316                     long maxSize,
317                     long minSizeNode,
318                     long minSizeRack,
319                     List<InputSplit> splits                     
320                    ) {
321    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
322    long curSplitSize = 0;
323    
324    int totalNodes = nodeToBlocks.size();
325    long totalLength = totLength;
326
327    Multiset<String> splitsPerNode = HashMultiset.create();
328    Set<String> completedNodes = new HashSet<String>();
329    
330    while(true) {
331      for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
332          .entrySet().iterator(); iter.hasNext();) {
333        Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
334        
335        String node = one.getKey();
336        
337        // Skip the node if it has previously been marked as completed.
338        if (completedNodes.contains(node)) {
339          continue;
340        }
341
342        Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
343
344        // for each block, copy it into validBlocks. Delete it from
345        // blockToNodes so that the same block does not appear in
346        // two different splits.
347        Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
348        while (oneBlockIter.hasNext()) {
349          OneBlockInfo oneblock = oneBlockIter.next();
350          
351          // Remove all blocks which may already have been assigned to other
352          // splits.
353          if(!blockToNodes.containsKey(oneblock)) {
354            oneBlockIter.remove();
355            continue;
356          }
357        
358          validBlocks.add(oneblock);
359          blockToNodes.remove(oneblock);
360          curSplitSize += oneblock.length;
361
362          // if the accumulated split size exceeds the maximum, then
363          // create this split.
364          if (maxSize != 0 && curSplitSize >= maxSize) {
365            // create an input split and add it to the splits array
366            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
367            totalLength -= curSplitSize;
368            curSplitSize = 0;
369
370            splitsPerNode.add(node);
371
372            // Remove entries from blocksInNode so that we don't walk these
373            // again.
374            blocksInCurrentNode.removeAll(validBlocks);
375            validBlocks.clear();
376
377            // Done creating a single split for this node. Move on to the next
378            // node so that splits are distributed across nodes.
379            break;
380          }
381
382        }
383        if (validBlocks.size() != 0) {
384          // This implies that the last few blocks (or all in case maxSize=0)
385          // were not part of a split. The node is complete.
386          
387          // if there were any blocks left over and their combined size is
388          // larger than minSplitNode, then combine them into one split.
389          // Otherwise add them back to the unprocessed pool. It is likely
390          // that they will be combined with other blocks from the
391          // same rack later on.
392          // This condition also kicks in when max split size is not set. All
393          // blocks on a node will be grouped together into a single split.
394          if (minSizeNode != 0 && curSplitSize >= minSizeNode
395              && splitsPerNode.count(node) == 0) {
396            // haven't created any split on this machine. so its ok to add a
397            // smaller one for parallelism. Otherwise group it in the rack for
398            // balanced size create an input split and add it to the splits
399            // array
400            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
401            totalLength -= curSplitSize;
402            splitsPerNode.add(node);
403            // Remove entries from blocksInNode so that we don't walk this again.
404            blocksInCurrentNode.removeAll(validBlocks);
405            // The node is done. This was the last set of blocks for this node.
406          } else {
407            // Put the unplaced blocks back into the pool for later rack-allocation.
408            for (OneBlockInfo oneblock : validBlocks) {
409              blockToNodes.put(oneblock, oneblock.hosts);
410            }
411          }
412          validBlocks.clear();
413          curSplitSize = 0;
414          completedNodes.add(node);
415        } else { // No in-flight blocks.
416          if (blocksInCurrentNode.size() == 0) {
417            // Node is done. All blocks were fit into node-local splits.
418            completedNodes.add(node);
419          } // else Run through the node again.
420        }
421      }
422
423      // Check if node-local assignments are complete.
424      if (completedNodes.size() == totalNodes || totalLength == 0) {
425        // All nodes have been walked over and marked as completed or all blocks
426        // have been assigned. The rest should be handled via rackLock assignment.
427        LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
428            + completedNodes.size() + ", size left: " + totalLength);
429        break;
430      }
431    }
432
433    // if blocks in a rack are below the specified minimum size, then keep them
434    // in 'overflow'. After the processing of all racks is complete, these 
435    // overflow blocks will be combined into splits.
436    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
437    Set<String> racks = new HashSet<String>();
438
439    // Process all racks over and over again until there is no more work to do.
440    while (blockToNodes.size() > 0) {
441
442      // Create one split for this rack before moving over to the next rack. 
443      // Come back to this rack after creating a single split for each of the 
444      // remaining racks.
445      // Process one rack location at a time, Combine all possible blocks that
446      // reside on this rack as one split. (constrained by minimum and maximum
447      // split size).
448
449      // iterate over all racks 
450      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
451           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
452
453        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
454        racks.add(one.getKey());
455        List<OneBlockInfo> blocks = one.getValue();
456
457        // for each block, copy it into validBlocks. Delete it from 
458        // blockToNodes so that the same block does not appear in 
459        // two different splits.
460        boolean createdSplit = false;
461        for (OneBlockInfo oneblock : blocks) {
462          if (blockToNodes.containsKey(oneblock)) {
463            validBlocks.add(oneblock);
464            blockToNodes.remove(oneblock);
465            curSplitSize += oneblock.length;
466      
467            // if the accumulated split size exceeds the maximum, then 
468            // create this split.
469            if (maxSize != 0 && curSplitSize >= maxSize) {
470              // create an input split and add it to the splits array
471              addCreatedSplit(splits, getHosts(racks), validBlocks);
472              createdSplit = true;
473              break;
474            }
475          }
476        }
477
478        // if we created a split, then just go to the next rack
479        if (createdSplit) {
480          curSplitSize = 0;
481          validBlocks.clear();
482          racks.clear();
483          continue;
484        }
485
486        if (!validBlocks.isEmpty()) {
487          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
488            // if there is a minimum size specified, then create a single split
489            // otherwise, store these blocks into overflow data structure
490            addCreatedSplit(splits, getHosts(racks), validBlocks);
491          } else {
492            // There were a few blocks in this rack that 
493                // remained to be processed. Keep them in 'overflow' block list. 
494                // These will be combined later.
495            overflowBlocks.addAll(validBlocks);
496          }
497        }
498        curSplitSize = 0;
499        validBlocks.clear();
500        racks.clear();
501      }
502    }
503
504    assert blockToNodes.isEmpty();
505    assert curSplitSize == 0;
506    assert validBlocks.isEmpty();
507    assert racks.isEmpty();
508
509    // Process all overflow blocks
510    for (OneBlockInfo oneblock : overflowBlocks) {
511      validBlocks.add(oneblock);
512      curSplitSize += oneblock.length;
513
514      // This might cause an exiting rack location to be re-added,
515      // but it should be ok.
516      for (int i = 0; i < oneblock.racks.length; i++) {
517        racks.add(oneblock.racks[i]);
518      }
519
520      // if the accumulated split size exceeds the maximum, then 
521      // create this split.
522      if (maxSize != 0 && curSplitSize >= maxSize) {
523        // create an input split and add it to the splits array
524        addCreatedSplit(splits, getHosts(racks), validBlocks);
525        curSplitSize = 0;
526        validBlocks.clear();
527        racks.clear();
528      }
529    }
530
531    // Process any remaining blocks, if any.
532    if (!validBlocks.isEmpty()) {
533      addCreatedSplit(splits, getHosts(racks), validBlocks);
534    }
535  }
536
537  /**
538   * Create a single split from the list of blocks specified in validBlocks
539   * Add this new split into splitList.
540   */
541  private void addCreatedSplit(List<InputSplit> splitList, 
542                               Collection<String> locations, 
543                               ArrayList<OneBlockInfo> validBlocks) {
544    // create an input split
545    Path[] fl = new Path[validBlocks.size()];
546    long[] offset = new long[validBlocks.size()];
547    long[] length = new long[validBlocks.size()];
548    for (int i = 0; i < validBlocks.size(); i++) {
549      fl[i] = validBlocks.get(i).onepath; 
550      offset[i] = validBlocks.get(i).offset;
551      length[i] = validBlocks.get(i).length;
552    }
553     // add this split to the list that is returned
554    CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
555                                   length, locations.toArray(new String[0]));
556    splitList.add(thissplit); 
557  }
558
559  /**
560   * This is not implemented yet. 
561   */
562  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
563      TaskAttemptContext context) throws IOException;
564
565  /**
566   * information about one file from the File System
567   */
568  @VisibleForTesting
569  static class OneFileInfo {
570    private long fileSize;               // size of the file
571    private OneBlockInfo[] blocks;       // all blocks in this file
572
573    OneFileInfo(FileStatus stat, Configuration conf,
574                boolean isSplitable,
575                HashMap<String, List<OneBlockInfo>> rackToBlocks,
576                HashMap<OneBlockInfo, String[]> blockToNodes,
577                HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
578                HashMap<String, Set<String>> rackToNodes,
579                long maxSize)
580                throws IOException {
581      this.fileSize = 0;
582
583      // get block locations from file system
584      BlockLocation[] locations;
585      if (stat instanceof LocatedFileStatus) {
586        locations = ((LocatedFileStatus) stat).getBlockLocations();
587      } else {
588        FileSystem fs = stat.getPath().getFileSystem(conf);
589        locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
590      }
591      // create a list of all block and their locations
592      if (locations == null) {
593        blocks = new OneBlockInfo[0];
594      } else {
595
596        if(locations.length == 0 && !stat.isDirectory()) {
597          locations = new BlockLocation[] { new BlockLocation() };
598        }
599
600        if (!isSplitable) {
601          // if the file is not splitable, just create the one block with
602          // full file length
603          blocks = new OneBlockInfo[1];
604          fileSize = stat.getLen();
605          blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
606              locations[0].getHosts(), locations[0].getTopologyPaths());
607        } else {
608          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
609              locations.length);
610          for (int i = 0; i < locations.length; i++) {
611            fileSize += locations[i].getLength();
612
613            // each split can be a maximum of maxSize
614            long left = locations[i].getLength();
615            long myOffset = locations[i].getOffset();
616            long myLength = 0;
617            do {
618              if (maxSize == 0) {
619                myLength = left;
620              } else {
621                if (left > maxSize && left < 2 * maxSize) {
622                  // if remainder is between max and 2*max - then
623                  // instead of creating splits of size max, left-max we
624                  // create splits of size left/2 and left/2. This is
625                  // a heuristic to avoid creating really really small
626                  // splits.
627                  myLength = left / 2;
628                } else {
629                  myLength = Math.min(maxSize, left);
630                }
631              }
632              OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
633                  myOffset, myLength, locations[i].getHosts(),
634                  locations[i].getTopologyPaths());
635              left -= myLength;
636              myOffset += myLength;
637
638              blocksList.add(oneblock);
639            } while (left > 0);
640          }
641          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
642        }
643        
644        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
645                          nodeToBlocks, rackToNodes);
646      }
647    }
648    
649    @VisibleForTesting
650    static void populateBlockInfo(OneBlockInfo[] blocks,
651                          Map<String, List<OneBlockInfo>> rackToBlocks,
652                          Map<OneBlockInfo, String[]> blockToNodes,
653                          Map<String, Set<OneBlockInfo>> nodeToBlocks,
654                          Map<String, Set<String>> rackToNodes) {
655      for (OneBlockInfo oneblock : blocks) {
656        // add this block to the block --> node locations map
657        blockToNodes.put(oneblock, oneblock.hosts);
658
659        // For blocks that do not have host/rack information,
660        // assign to default  rack.
661        String[] racks = null;
662        if (oneblock.hosts.length == 0) {
663          racks = new String[]{NetworkTopology.DEFAULT_RACK};
664        } else {
665          racks = oneblock.racks;
666        }
667
668        // add this block to the rack --> block map
669        for (int j = 0; j < racks.length; j++) {
670          String rack = racks[j];
671          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
672          if (blklist == null) {
673            blklist = new ArrayList<OneBlockInfo>();
674            rackToBlocks.put(rack, blklist);
675          }
676          blklist.add(oneblock);
677          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
678            // Add this host to rackToNodes map
679            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
680          }
681        }
682
683        // add this block to the node --> block map
684        for (int j = 0; j < oneblock.hosts.length; j++) {
685          String node = oneblock.hosts[j];
686          Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
687          if (blklist == null) {
688            blklist = new LinkedHashSet<OneBlockInfo>();
689            nodeToBlocks.put(node, blklist);
690          }
691          blklist.add(oneblock);
692        }
693      }
694    }
695
696    long getLength() {
697      return fileSize;
698    }
699
700    OneBlockInfo[] getBlocks() {
701      return blocks;
702    }
703  }
704
705  /**
706   * information about one block from the File System
707   */
708  @VisibleForTesting
709  static class OneBlockInfo {
710    Path onepath;                // name of this file
711    long offset;                 // offset in file
712    long length;                 // length of this block
713    String[] hosts;              // nodes on which this block resides
714    String[] racks;              // network topology of hosts
715
716    OneBlockInfo(Path path, long offset, long len, 
717                 String[] hosts, String[] topologyPaths) {
718      this.onepath = path;
719      this.offset = offset;
720      this.hosts = hosts;
721      this.length = len;
722      assert (hosts.length == topologyPaths.length ||
723              topologyPaths.length == 0);
724
725      // if the file system does not have any rack information, then
726      // use dummy rack location.
727      if (topologyPaths.length == 0) {
728        topologyPaths = new String[hosts.length];
729        for (int i = 0; i < topologyPaths.length; i++) {
730          topologyPaths[i] = (new NodeBase(hosts[i], 
731                              NetworkTopology.DEFAULT_RACK)).toString();
732        }
733      }
734
735      // The topology paths have the host name included as the last 
736      // component. Strip it.
737      this.racks = new String[topologyPaths.length];
738      for (int i = 0; i < topologyPaths.length; i++) {
739        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
740      }
741    }
742  }
743
744  protected BlockLocation[] getFileBlockLocations(
745    FileSystem fs, FileStatus stat) throws IOException {
746    if (stat instanceof LocatedFileStatus) {
747      return ((LocatedFileStatus) stat).getBlockLocations();
748    }
749    return fs.getFileBlockLocations(stat, 0, stat.getLen());
750  }
751
752  private static void addHostToRack(Map<String, Set<String>> rackToNodes,
753                                    String rack, String host) {
754    Set<String> hosts = rackToNodes.get(rack);
755    if (hosts == null) {
756      hosts = new HashSet<String>();
757      rackToNodes.put(rack, hosts);
758    }
759    hosts.add(host);
760  }
761  
762  private Set<String> getHosts(Set<String> racks) {
763    Set<String> hosts = new HashSet<String>();
764    for (String rack : racks) {
765      if (rackToNodes.containsKey(rack)) {
766        hosts.addAll(rackToNodes.get(rack));
767      }
768    }
769    return hosts;
770  }
771  
772  /**
773   * Accept a path only if any one of filters given in the
774   * constructor do. 
775   */
776  private static class MultiPathFilter implements PathFilter {
777    private List<PathFilter> filters;
778
779    public MultiPathFilter() {
780      this.filters = new ArrayList<PathFilter>();
781    }
782
783    public MultiPathFilter(List<PathFilter> filters) {
784      this.filters = filters;
785    }
786
787    public void add(PathFilter one) {
788      filters.add(one);
789    }
790
791    public boolean accept(Path path) {
792      for (PathFilter filter : filters) {
793        if (filter.accept(path)) {
794          return true;
795        }
796      }
797      return false;
798    }
799
800    public String toString() {
801      StringBuffer buf = new StringBuffer();
802      buf.append("[");
803      for (PathFilter f: filters) {
804        buf.append(f);
805        buf.append(",");
806      }
807      buf.append("]");
808      return buf.toString();
809    }
810  }
811}