001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.LinkedHashSet;
026import java.util.HashSet;
027import java.util.List;
028import java.util.HashMap;
029import java.util.Set;
030import java.util.Iterator;
031import java.util.Map;
032import java.util.Map.Entry;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.LocatedFileStatus;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.BlockLocation;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.PathFilter;
045import org.apache.hadoop.io.compress.CompressionCodec;
046import org.apache.hadoop.io.compress.CompressionCodecFactory;
047import org.apache.hadoop.io.compress.SplittableCompressionCodec;
048import org.apache.hadoop.mapreduce.InputFormat;
049import org.apache.hadoop.mapreduce.InputSplit;
050import org.apache.hadoop.mapreduce.JobContext;
051import org.apache.hadoop.mapreduce.RecordReader;
052import org.apache.hadoop.mapreduce.TaskAttemptContext;
053import org.apache.hadoop.net.NodeBase;
054import org.apache.hadoop.net.NetworkTopology;
055
056import com.google.common.annotations.VisibleForTesting;
057import com.google.common.collect.HashMultiset;
058import com.google.common.collect.Multiset;
059
060/**
061 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
062 * {@link InputFormat#getSplits(JobContext)} method. 
063 * 
064 * Splits are constructed from the files under the input paths. 
065 * A split cannot have files from different pools.
066 * Each split returned may contain blocks from different files.
067 * If a maxSplitSize is specified, then blocks on the same node are
068 * combined to form a single split. Blocks that are left over are
069 * then combined with other blocks in the same rack. 
070 * If maxSplitSize is not specified, then blocks from the same rack
071 * are combined in a single split; no attempt is made to create
072 * node-local splits.
073 * If the maxSplitSize is equal to the block size, then this class
074 * is similar to the default splitting behavior in Hadoop: each
075 * block is a locally processed split.
076 * Subclasses implement 
077 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
078 * to construct <code>RecordReader</code>'s for 
079 * <code>CombineFileSplit</code>'s.
080 * 
081 * @see CombineFileSplit
082 */
083@InterfaceAudience.Public
084@InterfaceStability.Stable
085public abstract class CombineFileInputFormat<K, V>
086  extends FileInputFormat<K, V> {
087  
088  private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class);
089  
090  public static final String SPLIT_MINSIZE_PERNODE = 
091    "mapreduce.input.fileinputformat.split.minsize.per.node";
092  public static final String SPLIT_MINSIZE_PERRACK = 
093    "mapreduce.input.fileinputformat.split.minsize.per.rack";
094  // ability to limit the size of a single split
095  private long maxSplitSize = 0;
096  private long minSplitSizeNode = 0;
097  private long minSplitSizeRack = 0;
098
099  // A pool of input paths filters. A split cannot have blocks from files
100  // across multiple pools.
101  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
102
103  // mapping from a rack name to the set of Nodes in the rack 
104  private HashMap<String, Set<String>> rackToNodes = 
105                            new HashMap<String, Set<String>>();
106  /**
107   * Specify the maximum size (in bytes) of each split. Each split is
108   * approximately equal to the specified size.
109   */
110  protected void setMaxSplitSize(long maxSplitSize) {
111    this.maxSplitSize = maxSplitSize;
112  }
113
114  /**
115   * Specify the minimum size (in bytes) of each split per node.
116   * This applies to data that is left over after combining data on a single
117   * node into splits that are of maximum size specified by maxSplitSize.
118   * This leftover data will be combined into its own split if its size
119   * exceeds minSplitSizeNode.
120   */
121  protected void setMinSplitSizeNode(long minSplitSizeNode) {
122    this.minSplitSizeNode = minSplitSizeNode;
123  }
124
125  /**
126   * Specify the minimum size (in bytes) of each split per rack.
127   * This applies to data that is left over after combining data on a single
128   * rack into splits that are of maximum size specified by maxSplitSize.
129   * This leftover data will be combined into its own split if its size
130   * exceeds minSplitSizeRack.
131   */
132  protected void setMinSplitSizeRack(long minSplitSizeRack) {
133    this.minSplitSizeRack = minSplitSizeRack;
134  }
135
136  /**
137   * Create a new pool and add the filters to it.
138   * A split cannot have files from different pools.
139   */
140  protected void createPool(List<PathFilter> filters) {
141    pools.add(new MultiPathFilter(filters));
142  }
143
144  /**
145   * Create a new pool and add the filters to it. 
146   * A pathname can satisfy any one of the specified filters.
147   * A split cannot have files from different pools.
148   */
149  protected void createPool(PathFilter... filters) {
150    MultiPathFilter multi = new MultiPathFilter();
151    for (PathFilter f: filters) {
152      multi.add(f);
153    }
154    pools.add(multi);
155  }
156  
157  @Override
158  protected boolean isSplitable(JobContext context, Path file) {
159    final CompressionCodec codec =
160      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
161    if (null == codec) {
162      return true;
163    }
164    return codec instanceof SplittableCompressionCodec;
165  }
166
167  /**
168   * default constructor
169   */
170  public CombineFileInputFormat() {
171  }
172
173  @Override
174  public List<InputSplit> getSplits(JobContext job) 
175    throws IOException {
176    long minSizeNode = 0;
177    long minSizeRack = 0;
178    long maxSize = 0;
179    Configuration conf = job.getConfiguration();
180
181    // the values specified by setxxxSplitSize() takes precedence over the
182    // values that might have been specified in the config
183    if (minSplitSizeNode != 0) {
184      minSizeNode = minSplitSizeNode;
185    } else {
186      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
187    }
188    if (minSplitSizeRack != 0) {
189      minSizeRack = minSplitSizeRack;
190    } else {
191      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
192    }
193    if (maxSplitSize != 0) {
194      maxSize = maxSplitSize;
195    } else {
196      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
197      // If maxSize is not configured, a single split will be generated per
198      // node.
199    }
200    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
201      throw new IOException("Minimum split size pernode " + minSizeNode +
202                            " cannot be larger than maximum split size " +
203                            maxSize);
204    }
205    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
206      throw new IOException("Minimum split size per rack " + minSizeRack +
207                            " cannot be larger than maximum split size " +
208                            maxSize);
209    }
210    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
211      throw new IOException("Minimum split size per node " + minSizeNode +
212                            " cannot be larger than minimum split " +
213                            "size per rack " + minSizeRack);
214    }
215
216    // all the files in input set
217    List<FileStatus> stats = listStatus(job);
218    List<InputSplit> splits = new ArrayList<InputSplit>();
219    if (stats.size() == 0) {
220      return splits;    
221    }
222
223    // In one single iteration, process all the paths in a single pool.
224    // Processing one pool at a time ensures that a split contains paths
225    // from a single pool only.
226    for (MultiPathFilter onepool : pools) {
227      ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
228      
229      // pick one input path. If it matches all the filters in a pool,
230      // add it to the output set
231      for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
232        FileStatus p = iter.next();
233        if (onepool.accept(p.getPath())) {
234          myPaths.add(p); // add it to my output set
235          iter.remove();
236        }
237      }
238      // create splits for all files in this pool.
239      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
240    }
241
242    // create splits for all files that are not in any pool.
243    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
244
245    // free up rackToNodes map
246    rackToNodes.clear();
247    return splits;    
248  }
249
250  /**
251   * Return all the splits in the specified set of paths
252   */
253  private void getMoreSplits(JobContext job, List<FileStatus> stats,
254                             long maxSize, long minSizeNode, long minSizeRack,
255                             List<InputSplit> splits)
256    throws IOException {
257    Configuration conf = job.getConfiguration();
258
259    // all blocks for all the files in input set
260    OneFileInfo[] files;
261  
262    // mapping from a rack name to the list of blocks it has
263    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
264                              new HashMap<String, List<OneBlockInfo>>();
265
266    // mapping from a block to the nodes on which it has replicas
267    HashMap<OneBlockInfo, String[]> blockToNodes = 
268                              new HashMap<OneBlockInfo, String[]>();
269
270    // mapping from a node to the list of blocks that it contains
271    HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
272                              new HashMap<String, Set<OneBlockInfo>>();
273    
274    files = new OneFileInfo[stats.size()];
275    if (stats.size() == 0) {
276      return; 
277    }
278
279    // populate all the blocks for all files
280    long totLength = 0;
281    int i = 0;
282    for (FileStatus stat : stats) {
283      files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
284                                 rackToBlocks, blockToNodes, nodeToBlocks,
285                                 rackToNodes, maxSize);
286      totLength += files[i].getLength();
287    }
288    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
289                 maxSize, minSizeNode, minSizeRack, splits);
290  }
291
292  @VisibleForTesting
293  void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
294                     Map<OneBlockInfo, String[]> blockToNodes,
295                     Map<String, List<OneBlockInfo>> rackToBlocks,
296                     long totLength,
297                     long maxSize,
298                     long minSizeNode,
299                     long minSizeRack,
300                     List<InputSplit> splits                     
301                    ) {
302    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
303    long curSplitSize = 0;
304    
305    int totalNodes = nodeToBlocks.size();
306    long totalLength = totLength;
307
308    Multiset<String> splitsPerNode = HashMultiset.create();
309    Set<String> completedNodes = new HashSet<String>();
310    
311    while(true) {
312      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
313
314      // process all nodes and create splits that are local to a node. Generate
315      // one split per node iteration, and walk over nodes multiple times to
316      // distribute the splits across nodes. 
317      for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
318          .entrySet().iterator(); iter.hasNext();) {
319        Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
320        
321        String node = one.getKey();
322        
323        // Skip the node if it has previously been marked as completed.
324        if (completedNodes.contains(node)) {
325          continue;
326        }
327
328        Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
329
330        // for each block, copy it into validBlocks. Delete it from
331        // blockToNodes so that the same block does not appear in
332        // two different splits.
333        Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
334        while (oneBlockIter.hasNext()) {
335          OneBlockInfo oneblock = oneBlockIter.next();
336          
337          // Remove all blocks which may already have been assigned to other
338          // splits.
339          if(!blockToNodes.containsKey(oneblock)) {
340            oneBlockIter.remove();
341            continue;
342          }
343        
344          validBlocks.add(oneblock);
345          blockToNodes.remove(oneblock);
346          curSplitSize += oneblock.length;
347
348          // if the accumulated split size exceeds the maximum, then
349          // create this split.
350          if (maxSize != 0 && curSplitSize >= maxSize) {
351            // create an input split and add it to the splits array
352            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
353            totalLength -= curSplitSize;
354            curSplitSize = 0;
355
356            splitsPerNode.add(node);
357
358            // Remove entries from blocksInNode so that we don't walk these
359            // again.
360            blocksInCurrentNode.removeAll(validBlocks);
361            validBlocks.clear();
362
363            // Done creating a single split for this node. Move on to the next
364            // node so that splits are distributed across nodes.
365            break;
366          }
367
368        }
369        if (validBlocks.size() != 0) {
370          // This implies that the last few blocks (or all in case maxSize=0)
371          // were not part of a split. The node is complete.
372          
373          // if there were any blocks left over and their combined size is
374          // larger than minSplitNode, then combine them into one split.
375          // Otherwise add them back to the unprocessed pool. It is likely
376          // that they will be combined with other blocks from the
377          // same rack later on.
378          // This condition also kicks in when max split size is not set. All
379          // blocks on a node will be grouped together into a single split.
380          if (minSizeNode != 0 && curSplitSize >= minSizeNode
381              && splitsPerNode.count(node) == 0) {
382            // haven't created any split on this machine. so its ok to add a
383            // smaller one for parallelism. Otherwise group it in the rack for
384            // balanced size create an input split and add it to the splits
385            // array
386            addCreatedSplit(splits, Collections.singleton(node), validBlocks);
387            totalLength -= curSplitSize;
388            splitsPerNode.add(node);
389            // Remove entries from blocksInNode so that we don't walk this again.
390            blocksInCurrentNode.removeAll(validBlocks);
391            // The node is done. This was the last set of blocks for this node.
392          } else {
393            // Put the unplaced blocks back into the pool for later rack-allocation.
394            for (OneBlockInfo oneblock : validBlocks) {
395              blockToNodes.put(oneblock, oneblock.hosts);
396            }
397          }
398          validBlocks.clear();
399          curSplitSize = 0;
400          completedNodes.add(node);
401        } else { // No in-flight blocks.
402          if (blocksInCurrentNode.size() == 0) {
403            // Node is done. All blocks were fit into node-local splits.
404            completedNodes.add(node);
405          } // else Run through the node again.
406        }
407      }
408
409      // Check if node-local assignments are complete.
410      if (completedNodes.size() == totalNodes || totalLength == 0) {
411        // All nodes have been walked over and marked as completed or all blocks
412        // have been assigned. The rest should be handled via rackLock assignment.
413        LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
414            + completedNodes.size() + ", size left: " + totalLength);
415        break;
416      }
417    }
418
419    // if blocks in a rack are below the specified minimum size, then keep them
420    // in 'overflow'. After the processing of all racks is complete, these 
421    // overflow blocks will be combined into splits.
422    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
423    Set<String> racks = new HashSet<String>();
424
425    // Process all racks over and over again until there is no more work to do.
426    while (blockToNodes.size() > 0) {
427
428      // Create one split for this rack before moving over to the next rack. 
429      // Come back to this rack after creating a single split for each of the 
430      // remaining racks.
431      // Process one rack location at a time, Combine all possible blocks that
432      // reside on this rack as one split. (constrained by minimum and maximum
433      // split size).
434
435      // iterate over all racks 
436      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
437           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
438
439        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
440        racks.add(one.getKey());
441        List<OneBlockInfo> blocks = one.getValue();
442
443        // for each block, copy it into validBlocks. Delete it from 
444        // blockToNodes so that the same block does not appear in 
445        // two different splits.
446        boolean createdSplit = false;
447        for (OneBlockInfo oneblock : blocks) {
448          if (blockToNodes.containsKey(oneblock)) {
449            validBlocks.add(oneblock);
450            blockToNodes.remove(oneblock);
451            curSplitSize += oneblock.length;
452      
453            // if the accumulated split size exceeds the maximum, then 
454            // create this split.
455            if (maxSize != 0 && curSplitSize >= maxSize) {
456              // create an input split and add it to the splits array
457              addCreatedSplit(splits, getHosts(racks), validBlocks);
458              createdSplit = true;
459              break;
460            }
461          }
462        }
463
464        // if we created a split, then just go to the next rack
465        if (createdSplit) {
466          curSplitSize = 0;
467          validBlocks.clear();
468          racks.clear();
469          continue;
470        }
471
472        if (!validBlocks.isEmpty()) {
473          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
474            // if there is a minimum size specified, then create a single split
475            // otherwise, store these blocks into overflow data structure
476            addCreatedSplit(splits, getHosts(racks), validBlocks);
477          } else {
478            // There were a few blocks in this rack that 
479                // remained to be processed. Keep them in 'overflow' block list. 
480                // These will be combined later.
481            overflowBlocks.addAll(validBlocks);
482          }
483        }
484        curSplitSize = 0;
485        validBlocks.clear();
486        racks.clear();
487      }
488    }
489
490    assert blockToNodes.isEmpty();
491    assert curSplitSize == 0;
492    assert validBlocks.isEmpty();
493    assert racks.isEmpty();
494
495    // Process all overflow blocks
496    for (OneBlockInfo oneblock : overflowBlocks) {
497      validBlocks.add(oneblock);
498      curSplitSize += oneblock.length;
499
500      // This might cause an exiting rack location to be re-added,
501      // but it should be ok.
502      for (int i = 0; i < oneblock.racks.length; i++) {
503        racks.add(oneblock.racks[i]);
504      }
505
506      // if the accumulated split size exceeds the maximum, then 
507      // create this split.
508      if (maxSize != 0 && curSplitSize >= maxSize) {
509        // create an input split and add it to the splits array
510        addCreatedSplit(splits, getHosts(racks), validBlocks);
511        curSplitSize = 0;
512        validBlocks.clear();
513        racks.clear();
514      }
515    }
516
517    // Process any remaining blocks, if any.
518    if (!validBlocks.isEmpty()) {
519      addCreatedSplit(splits, getHosts(racks), validBlocks);
520    }
521  }
522
523  /**
524   * Create a single split from the list of blocks specified in validBlocks
525   * Add this new split into splitList.
526   */
527  private void addCreatedSplit(List<InputSplit> splitList, 
528                               Collection<String> locations, 
529                               ArrayList<OneBlockInfo> validBlocks) {
530    // create an input split
531    Path[] fl = new Path[validBlocks.size()];
532    long[] offset = new long[validBlocks.size()];
533    long[] length = new long[validBlocks.size()];
534    for (int i = 0; i < validBlocks.size(); i++) {
535      fl[i] = validBlocks.get(i).onepath; 
536      offset[i] = validBlocks.get(i).offset;
537      length[i] = validBlocks.get(i).length;
538    }
539     // add this split to the list that is returned
540    CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
541                                   length, locations.toArray(new String[0]));
542    splitList.add(thissplit); 
543  }
544
545  /**
546   * This is not implemented yet. 
547   */
548  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
549      TaskAttemptContext context) throws IOException;
550
551  /**
552   * information about one file from the File System
553   */
554  @VisibleForTesting
555  static class OneFileInfo {
556    private long fileSize;               // size of the file
557    private OneBlockInfo[] blocks;       // all blocks in this file
558
559    OneFileInfo(FileStatus stat, Configuration conf,
560                boolean isSplitable,
561                HashMap<String, List<OneBlockInfo>> rackToBlocks,
562                HashMap<OneBlockInfo, String[]> blockToNodes,
563                HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
564                HashMap<String, Set<String>> rackToNodes,
565                long maxSize)
566                throws IOException {
567      this.fileSize = 0;
568
569      // get block locations from file system
570      BlockLocation[] locations;
571      if (stat instanceof LocatedFileStatus) {
572        locations = ((LocatedFileStatus) stat).getBlockLocations();
573      } else {
574        FileSystem fs = stat.getPath().getFileSystem(conf);
575        locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
576      }
577      // create a list of all block and their locations
578      if (locations == null) {
579        blocks = new OneBlockInfo[0];
580      } else {
581
582        if(locations.length == 0 && !stat.isDirectory()) {
583          locations = new BlockLocation[] { new BlockLocation() };
584        }
585
586        if (!isSplitable) {
587          // if the file is not splitable, just create the one block with
588          // full file length
589          blocks = new OneBlockInfo[1];
590          fileSize = stat.getLen();
591          blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
592              locations[0].getHosts(), locations[0].getTopologyPaths());
593        } else {
594          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
595              locations.length);
596          for (int i = 0; i < locations.length; i++) {
597            fileSize += locations[i].getLength();
598
599            // each split can be a maximum of maxSize
600            long left = locations[i].getLength();
601            long myOffset = locations[i].getOffset();
602            long myLength = 0;
603            do {
604              if (maxSize == 0) {
605                myLength = left;
606              } else {
607                if (left > maxSize && left < 2 * maxSize) {
608                  // if remainder is between max and 2*max - then
609                  // instead of creating splits of size max, left-max we
610                  // create splits of size left/2 and left/2. This is
611                  // a heuristic to avoid creating really really small
612                  // splits.
613                  myLength = left / 2;
614                } else {
615                  myLength = Math.min(maxSize, left);
616                }
617              }
618              OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
619                  myOffset, myLength, locations[i].getHosts(),
620                  locations[i].getTopologyPaths());
621              left -= myLength;
622              myOffset += myLength;
623
624              blocksList.add(oneblock);
625            } while (left > 0);
626          }
627          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
628        }
629        
630        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
631                          nodeToBlocks, rackToNodes);
632      }
633    }
634    
635    @VisibleForTesting
636    static void populateBlockInfo(OneBlockInfo[] blocks,
637                          Map<String, List<OneBlockInfo>> rackToBlocks,
638                          Map<OneBlockInfo, String[]> blockToNodes,
639                          Map<String, Set<OneBlockInfo>> nodeToBlocks,
640                          Map<String, Set<String>> rackToNodes) {
641      for (OneBlockInfo oneblock : blocks) {
642        // add this block to the block --> node locations map
643        blockToNodes.put(oneblock, oneblock.hosts);
644
645        // For blocks that do not have host/rack information,
646        // assign to default  rack.
647        String[] racks = null;
648        if (oneblock.hosts.length == 0) {
649          racks = new String[]{NetworkTopology.DEFAULT_RACK};
650        } else {
651          racks = oneblock.racks;
652        }
653
654        // add this block to the rack --> block map
655        for (int j = 0; j < racks.length; j++) {
656          String rack = racks[j];
657          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
658          if (blklist == null) {
659            blklist = new ArrayList<OneBlockInfo>();
660            rackToBlocks.put(rack, blklist);
661          }
662          blklist.add(oneblock);
663          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
664            // Add this host to rackToNodes map
665            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
666          }
667        }
668
669        // add this block to the node --> block map
670        for (int j = 0; j < oneblock.hosts.length; j++) {
671          String node = oneblock.hosts[j];
672          Set<OneBlockInfo> blklist = nodeToBlocks.get(node);
673          if (blklist == null) {
674            blklist = new LinkedHashSet<OneBlockInfo>();
675            nodeToBlocks.put(node, blklist);
676          }
677          blklist.add(oneblock);
678        }
679      }
680    }
681
682    long getLength() {
683      return fileSize;
684    }
685
686    OneBlockInfo[] getBlocks() {
687      return blocks;
688    }
689  }
690
691  /**
692   * information about one block from the File System
693   */
694  @VisibleForTesting
695  static class OneBlockInfo {
696    Path onepath;                // name of this file
697    long offset;                 // offset in file
698    long length;                 // length of this block
699    String[] hosts;              // nodes on which this block resides
700    String[] racks;              // network topology of hosts
701
702    OneBlockInfo(Path path, long offset, long len, 
703                 String[] hosts, String[] topologyPaths) {
704      this.onepath = path;
705      this.offset = offset;
706      this.hosts = hosts;
707      this.length = len;
708      assert (hosts.length == topologyPaths.length ||
709              topologyPaths.length == 0);
710
711      // if the file system does not have any rack information, then
712      // use dummy rack location.
713      if (topologyPaths.length == 0) {
714        topologyPaths = new String[hosts.length];
715        for (int i = 0; i < topologyPaths.length; i++) {
716          topologyPaths[i] = (new NodeBase(hosts[i], 
717                              NetworkTopology.DEFAULT_RACK)).toString();
718        }
719      }
720
721      // The topology paths have the host name included as the last 
722      // component. Strip it.
723      this.racks = new String[topologyPaths.length];
724      for (int i = 0; i < topologyPaths.length; i++) {
725        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
726      }
727    }
728  }
729
730  protected BlockLocation[] getFileBlockLocations(
731    FileSystem fs, FileStatus stat) throws IOException {
732    if (stat instanceof LocatedFileStatus) {
733      return ((LocatedFileStatus) stat).getBlockLocations();
734    }
735    return fs.getFileBlockLocations(stat, 0, stat.getLen());
736  }
737
738  private static void addHostToRack(Map<String, Set<String>> rackToNodes,
739                                    String rack, String host) {
740    Set<String> hosts = rackToNodes.get(rack);
741    if (hosts == null) {
742      hosts = new HashSet<String>();
743      rackToNodes.put(rack, hosts);
744    }
745    hosts.add(host);
746  }
747  
748  private Set<String> getHosts(Set<String> racks) {
749    Set<String> hosts = new HashSet<String>();
750    for (String rack : racks) {
751      if (rackToNodes.containsKey(rack)) {
752        hosts.addAll(rackToNodes.get(rack));
753      }
754    }
755    return hosts;
756  }
757  
758  /**
759   * Accept a path only if any one of filters given in the
760   * constructor do. 
761   */
762  private static class MultiPathFilter implements PathFilter {
763    private List<PathFilter> filters;
764
765    public MultiPathFilter() {
766      this.filters = new ArrayList<PathFilter>();
767    }
768
769    public MultiPathFilter(List<PathFilter> filters) {
770      this.filters = filters;
771    }
772
773    public void add(PathFilter one) {
774      filters.add(one);
775    }
776
777    public boolean accept(Path path) {
778      for (PathFilter filter : filters) {
779        if (filter.accept(path)) {
780          return true;
781        }
782      }
783      return false;
784    }
785
786    public String toString() {
787      StringBuffer buf = new StringBuffer();
788      buf.append("[");
789      for (PathFilter f: filters) {
790        buf.append(f);
791        buf.append(",");
792      }
793      buf.append("]");
794      return buf.toString();
795    }
796  }
797}