001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashSet;
025import java.util.List;
026import java.util.HashMap;
027import java.util.Set;
028import java.util.Iterator;
029import java.util.Map;
030
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.LocatedFileStatus;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.fs.BlockLocation;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.PathFilter;
040import org.apache.hadoop.io.compress.CompressionCodec;
041import org.apache.hadoop.io.compress.CompressionCodecFactory;
042import org.apache.hadoop.io.compress.SplittableCompressionCodec;
043import org.apache.hadoop.mapreduce.InputFormat;
044import org.apache.hadoop.mapreduce.InputSplit;
045import org.apache.hadoop.mapreduce.JobContext;
046import org.apache.hadoop.mapreduce.RecordReader;
047import org.apache.hadoop.mapreduce.TaskAttemptContext;
048import org.apache.hadoop.net.NodeBase;
049import org.apache.hadoop.net.NetworkTopology;
050
051/**
052 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
053 * {@link InputFormat#getSplits(JobContext)} method. 
054 * 
055 * Splits are constructed from the files under the input paths. 
056 * A split cannot have files from different pools.
057 * Each split returned may contain blocks from different files.
058 * If a maxSplitSize is specified, then blocks on the same node are
059 * combined to form a single split. Blocks that are left over are
060 * then combined with other blocks in the same rack. 
061 * If maxSplitSize is not specified, then blocks from the same rack
062 * are combined in a single split; no attempt is made to create
063 * node-local splits.
064 * If the maxSplitSize is equal to the block size, then this class
065 * is similar to the default splitting behavior in Hadoop: each
066 * block is a locally processed split.
067 * Subclasses implement 
068 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
069 * to construct <code>RecordReader</code>'s for 
070 * <code>CombineFileSplit</code>'s.
071 * 
072 * @see CombineFileSplit
073 */
074@InterfaceAudience.Public
075@InterfaceStability.Stable
076public abstract class CombineFileInputFormat<K, V>
077  extends FileInputFormat<K, V> {
078
079  public static final String SPLIT_MINSIZE_PERNODE = 
080    "mapreduce.input.fileinputformat.split.minsize.per.node";
081  public static final String SPLIT_MINSIZE_PERRACK = 
082    "mapreduce.input.fileinputformat.split.minsize.per.rack";
083  // ability to limit the size of a single split
084  private long maxSplitSize = 0;
085  private long minSplitSizeNode = 0;
086  private long minSplitSizeRack = 0;
087
088  // A pool of input paths filters. A split cannot have blocks from files
089  // across multiple pools.
090  private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
091
092  // mapping from a rack name to the set of Nodes in the rack 
093  private HashMap<String, Set<String>> rackToNodes = 
094                            new HashMap<String, Set<String>>();
095  /**
096   * Specify the maximum size (in bytes) of each split. Each split is
097   * approximately equal to the specified size.
098   */
099  protected void setMaxSplitSize(long maxSplitSize) {
100    this.maxSplitSize = maxSplitSize;
101  }
102
103  /**
104   * Specify the minimum size (in bytes) of each split per node.
105   * This applies to data that is left over after combining data on a single
106   * node into splits that are of maximum size specified by maxSplitSize.
107   * This leftover data will be combined into its own split if its size
108   * exceeds minSplitSizeNode.
109   */
110  protected void setMinSplitSizeNode(long minSplitSizeNode) {
111    this.minSplitSizeNode = minSplitSizeNode;
112  }
113
114  /**
115   * Specify the minimum size (in bytes) of each split per rack.
116   * This applies to data that is left over after combining data on a single
117   * rack into splits that are of maximum size specified by maxSplitSize.
118   * This leftover data will be combined into its own split if its size
119   * exceeds minSplitSizeRack.
120   */
121  protected void setMinSplitSizeRack(long minSplitSizeRack) {
122    this.minSplitSizeRack = minSplitSizeRack;
123  }
124
125  /**
126   * Create a new pool and add the filters to it.
127   * A split cannot have files from different pools.
128   */
129  protected void createPool(List<PathFilter> filters) {
130    pools.add(new MultiPathFilter(filters));
131  }
132
133  /**
134   * Create a new pool and add the filters to it. 
135   * A pathname can satisfy any one of the specified filters.
136   * A split cannot have files from different pools.
137   */
138  protected void createPool(PathFilter... filters) {
139    MultiPathFilter multi = new MultiPathFilter();
140    for (PathFilter f: filters) {
141      multi.add(f);
142    }
143    pools.add(multi);
144  }
145  
146  @Override
147  protected boolean isSplitable(JobContext context, Path file) {
148    final CompressionCodec codec =
149      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
150    if (null == codec) {
151      return true;
152    }
153    return codec instanceof SplittableCompressionCodec;
154  }
155
156  /**
157   * default constructor
158   */
159  public CombineFileInputFormat() {
160  }
161
162  @Override
163  public List<InputSplit> getSplits(JobContext job) 
164    throws IOException {
165
166    long minSizeNode = 0;
167    long minSizeRack = 0;
168    long maxSize = 0;
169    Configuration conf = job.getConfiguration();
170
171    // the values specified by setxxxSplitSize() takes precedence over the
172    // values that might have been specified in the config
173    if (minSplitSizeNode != 0) {
174      minSizeNode = minSplitSizeNode;
175    } else {
176      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
177    }
178    if (minSplitSizeRack != 0) {
179      minSizeRack = minSplitSizeRack;
180    } else {
181      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
182    }
183    if (maxSplitSize != 0) {
184      maxSize = maxSplitSize;
185    } else {
186      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
187    }
188    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
189      throw new IOException("Minimum split size pernode " + minSizeNode +
190                            " cannot be larger than maximum split size " +
191                            maxSize);
192    }
193    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
194      throw new IOException("Minimum split size per rack" + minSizeRack +
195                            " cannot be larger than maximum split size " +
196                            maxSize);
197    }
198    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
199      throw new IOException("Minimum split size per node" + minSizeNode +
200                            " cannot be smaller than minimum split " +
201                            "size per rack " + minSizeRack);
202    }
203
204    // all the files in input set
205    List<FileStatus> stats = listStatus(job);
206    List<InputSplit> splits = new ArrayList<InputSplit>();
207    if (stats.size() == 0) {
208      return splits;    
209    }
210
211    // In one single iteration, process all the paths in a single pool.
212    // Processing one pool at a time ensures that a split contains paths
213    // from a single pool only.
214    for (MultiPathFilter onepool : pools) {
215      ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
216      
217      // pick one input path. If it matches all the filters in a pool,
218      // add it to the output set
219      for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
220        FileStatus p = iter.next();
221        if (onepool.accept(p.getPath())) {
222          myPaths.add(p); // add it to my output set
223          iter.remove();
224        }
225      }
226      // create splits for all files in this pool.
227      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
228    }
229
230    // create splits for all files that are not in any pool.
231    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
232
233    // free up rackToNodes map
234    rackToNodes.clear();
235    return splits;    
236  }
237
238  /**
239   * Return all the splits in the specified set of paths
240   */
241  private void getMoreSplits(JobContext job, List<FileStatus> stats,
242                             long maxSize, long minSizeNode, long minSizeRack,
243                             List<InputSplit> splits)
244    throws IOException {
245    Configuration conf = job.getConfiguration();
246
247    // all blocks for all the files in input set
248    OneFileInfo[] files;
249  
250    // mapping from a rack name to the list of blocks it has
251    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
252                              new HashMap<String, List<OneBlockInfo>>();
253
254    // mapping from a block to the nodes on which it has replicas
255    HashMap<OneBlockInfo, String[]> blockToNodes = 
256                              new HashMap<OneBlockInfo, String[]>();
257
258    // mapping from a node to the list of blocks that it contains
259    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
260                              new HashMap<String, List<OneBlockInfo>>();
261    
262    files = new OneFileInfo[stats.size()];
263    if (stats.size() == 0) {
264      return; 
265    }
266
267    // populate all the blocks for all files
268    long totLength = 0;
269    int fileIdx = 0;
270    for (FileStatus stat : stats) {
271      files[fileIdx] = new OneFileInfo(stat, conf,
272                                 isSplitable(job, stat.getPath()),
273                                 rackToBlocks, blockToNodes, nodeToBlocks,
274                                 rackToNodes, maxSize);
275      totLength += files[fileIdx].getLength();
276    }
277
278    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
279    Set<String> nodes = new HashSet<String>();
280    long curSplitSize = 0;
281
282    // process all nodes and create splits that are local
283    // to a node. 
284    for (Iterator<Map.Entry<String, 
285         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
286         iter.hasNext();) {
287
288      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
289      nodes.add(one.getKey());
290      List<OneBlockInfo> blocksInNode = one.getValue();
291
292      // for each block, copy it into validBlocks. Delete it from 
293      // blockToNodes so that the same block does not appear in 
294      // two different splits.
295      for (OneBlockInfo oneblock : blocksInNode) {
296        if (blockToNodes.containsKey(oneblock)) {
297          validBlocks.add(oneblock);
298          blockToNodes.remove(oneblock);
299          curSplitSize += oneblock.length;
300
301          // if the accumulated split size exceeds the maximum, then 
302          // create this split.
303          if (maxSize != 0 && curSplitSize >= maxSize) {
304            // create an input split and add it to the splits array
305            addCreatedSplit(splits, nodes, validBlocks);
306            curSplitSize = 0;
307            validBlocks.clear();
308          }
309        }
310      }
311      // if there were any blocks left over and their combined size is
312      // larger than minSplitNode, then combine them into one split.
313      // Otherwise add them back to the unprocessed pool. It is likely 
314      // that they will be combined with other blocks from the 
315      // same rack later on.
316      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
317        // create an input split and add it to the splits array
318        addCreatedSplit(splits, nodes, validBlocks);
319      } else {
320        for (OneBlockInfo oneblock : validBlocks) {
321          blockToNodes.put(oneblock, oneblock.hosts);
322        }
323      }
324      validBlocks.clear();
325      nodes.clear();
326      curSplitSize = 0;
327    }
328
329    // if blocks in a rack are below the specified minimum size, then keep them
330    // in 'overflow'. After the processing of all racks is complete, these 
331    // overflow blocks will be combined into splits.
332    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
333    Set<String> racks = new HashSet<String>();
334
335    // Process all racks over and over again until there is no more work to do.
336    while (blockToNodes.size() > 0) {
337
338      // Create one split for this rack before moving over to the next rack. 
339      // Come back to this rack after creating a single split for each of the 
340      // remaining racks.
341      // Process one rack location at a time, Combine all possible blocks that
342      // reside on this rack as one split. (constrained by minimum and maximum
343      // split size).
344
345      // iterate over all racks 
346      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
347           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
348
349        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
350        racks.add(one.getKey());
351        List<OneBlockInfo> blocks = one.getValue();
352
353        // for each block, copy it into validBlocks. Delete it from 
354        // blockToNodes so that the same block does not appear in 
355        // two different splits.
356        boolean createdSplit = false;
357        for (OneBlockInfo oneblock : blocks) {
358          if (blockToNodes.containsKey(oneblock)) {
359            validBlocks.add(oneblock);
360            blockToNodes.remove(oneblock);
361            curSplitSize += oneblock.length;
362      
363            // if the accumulated split size exceeds the maximum, then 
364            // create this split.
365            if (maxSize != 0 && curSplitSize >= maxSize) {
366              // create an input split and add it to the splits array
367              addCreatedSplit(splits, getHosts(racks), validBlocks);
368              createdSplit = true;
369              break;
370            }
371          }
372        }
373
374        // if we created a split, then just go to the next rack
375        if (createdSplit) {
376          curSplitSize = 0;
377          validBlocks.clear();
378          racks.clear();
379          continue;
380        }
381
382        if (!validBlocks.isEmpty()) {
383          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
384            // if there is a minimum size specified, then create a single split
385            // otherwise, store these blocks into overflow data structure
386            addCreatedSplit(splits, getHosts(racks), validBlocks);
387          } else {
388            // There were a few blocks in this rack that 
389                // remained to be processed. Keep them in 'overflow' block list. 
390                // These will be combined later.
391            overflowBlocks.addAll(validBlocks);
392          }
393        }
394        curSplitSize = 0;
395        validBlocks.clear();
396        racks.clear();
397      }
398    }
399
400    assert blockToNodes.isEmpty();
401    assert curSplitSize == 0;
402    assert validBlocks.isEmpty();
403    assert racks.isEmpty();
404
405    // Process all overflow blocks
406    for (OneBlockInfo oneblock : overflowBlocks) {
407      validBlocks.add(oneblock);
408      curSplitSize += oneblock.length;
409
410      // This might cause an exiting rack location to be re-added,
411      // but it should be ok.
412      for (int i = 0; i < oneblock.racks.length; i++) {
413        racks.add(oneblock.racks[i]);
414      }
415
416      // if the accumulated split size exceeds the maximum, then 
417      // create this split.
418      if (maxSize != 0 && curSplitSize >= maxSize) {
419        // create an input split and add it to the splits array
420        addCreatedSplit(splits, getHosts(racks), validBlocks);
421        curSplitSize = 0;
422        validBlocks.clear();
423        racks.clear();
424      }
425    }
426
427    // Process any remaining blocks, if any.
428    if (!validBlocks.isEmpty()) {
429      addCreatedSplit(splits, getHosts(racks), validBlocks);
430    }
431  }
432
433  /**
434   * Create a single split from the list of blocks specified in validBlocks
435   * Add this new split into splitList.
436   */
437  private void addCreatedSplit(List<InputSplit> splitList, 
438                               Collection<String> locations, 
439                               ArrayList<OneBlockInfo> validBlocks) {
440    // create an input split
441    Path[] fl = new Path[validBlocks.size()];
442    long[] offset = new long[validBlocks.size()];
443    long[] length = new long[validBlocks.size()];
444    for (int i = 0; i < validBlocks.size(); i++) {
445      fl[i] = validBlocks.get(i).onepath; 
446      offset[i] = validBlocks.get(i).offset;
447      length[i] = validBlocks.get(i).length;
448    }
449
450     // add this split to the list that is returned
451    CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
452                                   length, locations.toArray(new String[0]));
453    splitList.add(thissplit); 
454  }
455
456  /**
457   * This is not implemented yet. 
458   */
459  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
460      TaskAttemptContext context) throws IOException;
461
462  /**
463   * information about one file from the File System
464   */
465  private static class OneFileInfo {
466    private long fileSize;               // size of the file
467    private OneBlockInfo[] blocks;       // all blocks in this file
468
469    OneFileInfo(FileStatus stat, Configuration conf,
470                boolean isSplitable,
471                HashMap<String, List<OneBlockInfo>> rackToBlocks,
472                HashMap<OneBlockInfo, String[]> blockToNodes,
473                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
474                HashMap<String, Set<String>> rackToNodes,
475                long maxSize)
476                throws IOException {
477      this.fileSize = 0;
478
479      // get block locations from file system
480      BlockLocation[] locations;
481      if (stat instanceof LocatedFileStatus) {
482        locations = ((LocatedFileStatus) stat).getBlockLocations();
483      } else {
484        FileSystem fs = stat.getPath().getFileSystem(conf);
485        locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
486      }
487      // create a list of all block and their locations
488      if (locations == null) {
489        blocks = new OneBlockInfo[0];
490      } else {
491
492        if(locations.length == 0) {
493          locations = new BlockLocation[] { new BlockLocation() };
494        }
495
496        if (!isSplitable) {
497          // if the file is not splitable, just create the one block with
498          // full file length
499          blocks = new OneBlockInfo[1];
500          fileSize = stat.getLen();
501          blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
502              locations[0].getHosts(), locations[0].getTopologyPaths());
503        } else {
504          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
505              locations.length);
506          for (int i = 0; i < locations.length; i++) {
507            fileSize += locations[i].getLength();
508
509            // each split can be a maximum of maxSize
510            long left = locations[i].getLength();
511            long myOffset = locations[i].getOffset();
512            long myLength = 0;
513            do {
514              if (maxSize == 0) {
515                myLength = left;
516              } else {
517                if (left > maxSize && left < 2 * maxSize) {
518                  // if remainder is between max and 2*max - then
519                  // instead of creating splits of size max, left-max we
520                  // create splits of size left/2 and left/2. This is
521                  // a heuristic to avoid creating really really small
522                  // splits.
523                  myLength = left / 2;
524                } else {
525                  myLength = Math.min(maxSize, left);
526                }
527              }
528              OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
529                  myOffset, myLength, locations[i].getHosts(),
530                  locations[i].getTopologyPaths());
531              left -= myLength;
532              myOffset += myLength;
533
534              blocksList.add(oneblock);
535            } while (left > 0);
536          }
537          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
538        }
539
540        for (OneBlockInfo oneblock : blocks) {
541          // add this block to the block --> node locations map
542          blockToNodes.put(oneblock, oneblock.hosts);
543
544          // For blocks that do not have host/rack information,
545          // assign to default  rack.
546          String[] racks = null;
547          if (oneblock.hosts.length == 0) {
548            racks = new String[]{NetworkTopology.DEFAULT_RACK};
549          } else {
550            racks = oneblock.racks;
551          }
552
553          // add this block to the rack --> block map
554          for (int j = 0; j < racks.length; j++) {
555            String rack = racks[j];
556            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
557            if (blklist == null) {
558              blklist = new ArrayList<OneBlockInfo>();
559              rackToBlocks.put(rack, blklist);
560            }
561            blklist.add(oneblock);
562            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
563              // Add this host to rackToNodes map
564              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
565            }
566          }
567
568          // add this block to the node --> block map
569          for (int j = 0; j < oneblock.hosts.length; j++) {
570            String node = oneblock.hosts[j];
571            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
572            if (blklist == null) {
573              blklist = new ArrayList<OneBlockInfo>();
574              nodeToBlocks.put(node, blklist);
575            }
576            blklist.add(oneblock);
577          }
578        }
579      }
580    }
581
582    long getLength() {
583      return fileSize;
584    }
585
586    OneBlockInfo[] getBlocks() {
587      return blocks;
588    }
589  }
590
591  /**
592   * information about one block from the File System
593   */
594  private static class OneBlockInfo {
595    Path onepath;                // name of this file
596    long offset;                 // offset in file
597    long length;                 // length of this block
598    String[] hosts;              // nodes on which this block resides
599    String[] racks;              // network topology of hosts
600
601    OneBlockInfo(Path path, long offset, long len, 
602                 String[] hosts, String[] topologyPaths) {
603      this.onepath = path;
604      this.offset = offset;
605      this.hosts = hosts;
606      this.length = len;
607      assert (hosts.length == topologyPaths.length ||
608              topologyPaths.length == 0);
609
610      // if the file system does not have any rack information, then
611      // use dummy rack location.
612      if (topologyPaths.length == 0) {
613        topologyPaths = new String[hosts.length];
614        for (int i = 0; i < topologyPaths.length; i++) {
615          topologyPaths[i] = (new NodeBase(hosts[i], 
616                              NetworkTopology.DEFAULT_RACK)).toString();
617        }
618      }
619
620      // The topology paths have the host name included as the last 
621      // component. Strip it.
622      this.racks = new String[topologyPaths.length];
623      for (int i = 0; i < topologyPaths.length; i++) {
624        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
625      }
626    }
627  }
628
629  protected BlockLocation[] getFileBlockLocations(
630    FileSystem fs, FileStatus stat) throws IOException {
631    if (stat instanceof LocatedFileStatus) {
632      return ((LocatedFileStatus) stat).getBlockLocations();
633    }
634    return fs.getFileBlockLocations(stat, 0, stat.getLen());
635  }
636
637  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
638                                    String rack, String host) {
639    Set<String> hosts = rackToNodes.get(rack);
640    if (hosts == null) {
641      hosts = new HashSet<String>();
642      rackToNodes.put(rack, hosts);
643    }
644    hosts.add(host);
645  }
646  
647  private Set<String> getHosts(Set<String> racks) {
648    Set<String> hosts = new HashSet<String>();
649    for (String rack : racks) {
650      if (rackToNodes.containsKey(rack)) {
651        hosts.addAll(rackToNodes.get(rack));
652      }
653    }
654    return hosts;
655  }
656  
657  /**
658   * Accept a path only if any one of filters given in the
659   * constructor do. 
660   */
661  private static class MultiPathFilter implements PathFilter {
662    private List<PathFilter> filters;
663
664    public MultiPathFilter() {
665      this.filters = new ArrayList<PathFilter>();
666    }
667
668    public MultiPathFilter(List<PathFilter> filters) {
669      this.filters = filters;
670    }
671
672    public void add(PathFilter one) {
673      filters.add(one);
674    }
675
676    public boolean accept(Path path) {
677      for (PathFilter filter : filters) {
678        if (filter.accept(path)) {
679          return true;
680        }
681      }
682      return false;
683    }
684
685    public String toString() {
686      StringBuffer buf = new StringBuffer();
687      buf.append("[");
688      for (PathFilter f: filters) {
689        buf.append(f);
690        buf.append(",");
691      }
692      buf.append("]");
693      return buf.toString();
694    }
695  }
696}