001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Collection;
024 import java.util.LinkedList;
025 import java.util.HashSet;
026 import java.util.List;
027 import java.util.HashMap;
028 import java.util.Set;
029 import java.util.Iterator;
030 import java.util.Map;
031
032 import org.apache.hadoop.classification.InterfaceAudience;
033 import org.apache.hadoop.classification.InterfaceStability;
034 import org.apache.hadoop.conf.Configuration;
035 import org.apache.hadoop.fs.FileSystem;
036 import org.apache.hadoop.fs.FileUtil;
037 import org.apache.hadoop.fs.Path;
038 import org.apache.hadoop.fs.BlockLocation;
039 import org.apache.hadoop.fs.FileStatus;
040 import org.apache.hadoop.fs.PathFilter;
041 import org.apache.hadoop.io.compress.CompressionCodec;
042 import org.apache.hadoop.io.compress.CompressionCodecFactory;
043 import org.apache.hadoop.io.compress.SplittableCompressionCodec;
044 import org.apache.hadoop.mapreduce.InputFormat;
045 import org.apache.hadoop.mapreduce.InputSplit;
046 import org.apache.hadoop.mapreduce.JobContext;
047 import org.apache.hadoop.mapreduce.RecordReader;
048 import org.apache.hadoop.mapreduce.TaskAttemptContext;
049 import org.apache.hadoop.net.NodeBase;
050 import org.apache.hadoop.net.NetworkTopology;
051
052 import com.google.common.annotations.VisibleForTesting;
053
054 /**
055 * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
056 * {@link InputFormat#getSplits(JobContext)} method.
057 *
058 * Splits are constructed from the files under the input paths.
059 * A split cannot have files from different pools.
060 * Each split returned may contain blocks from different files.
061 * If a maxSplitSize is specified, then blocks on the same node are
062 * combined to form a single split. Blocks that are left over are
063 * then combined with other blocks in the same rack.
064 * If maxSplitSize is not specified, then blocks from the same rack
065 * are combined in a single split; no attempt is made to create
066 * node-local splits.
067 * If the maxSplitSize is equal to the block size, then this class
068 * is similar to the default splitting behavior in Hadoop: each
069 * block is a locally processed split.
070 * Subclasses implement
071 * {@link InputFormat#createRecordReader(InputSplit, TaskAttemptContext)}
072 * to construct <code>RecordReader</code>'s for
073 * <code>CombineFileSplit</code>'s.
074 *
075 * @see CombineFileSplit
076 */
077 @InterfaceAudience.Public
078 @InterfaceStability.Stable
079 public abstract class CombineFileInputFormat<K, V>
080 extends FileInputFormat<K, V> {
081
082 public static final String SPLIT_MINSIZE_PERNODE =
083 "mapreduce.input.fileinputformat.split.minsize.per.node";
084 public static final String SPLIT_MINSIZE_PERRACK =
085 "mapreduce.input.fileinputformat.split.minsize.per.rack";
086 // ability to limit the size of a single split
087 private long maxSplitSize = 0;
088 private long minSplitSizeNode = 0;
089 private long minSplitSizeRack = 0;
090
091 // A pool of input paths filters. A split cannot have blocks from files
092 // across multiple pools.
093 private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
094
095 // mapping from a rack name to the set of Nodes in the rack
096 private HashMap<String, Set<String>> rackToNodes =
097 new HashMap<String, Set<String>>();
098 /**
099 * Specify the maximum size (in bytes) of each split. Each split is
100 * approximately equal to the specified size.
101 */
102 protected void setMaxSplitSize(long maxSplitSize) {
103 this.maxSplitSize = maxSplitSize;
104 }
105
106 /**
107 * Specify the minimum size (in bytes) of each split per node.
108 * This applies to data that is left over after combining data on a single
109 * node into splits that are of maximum size specified by maxSplitSize.
110 * This leftover data will be combined into its own split if its size
111 * exceeds minSplitSizeNode.
112 */
113 protected void setMinSplitSizeNode(long minSplitSizeNode) {
114 this.minSplitSizeNode = minSplitSizeNode;
115 }
116
117 /**
118 * Specify the minimum size (in bytes) of each split per rack.
119 * This applies to data that is left over after combining data on a single
120 * rack into splits that are of maximum size specified by maxSplitSize.
121 * This leftover data will be combined into its own split if its size
122 * exceeds minSplitSizeRack.
123 */
124 protected void setMinSplitSizeRack(long minSplitSizeRack) {
125 this.minSplitSizeRack = minSplitSizeRack;
126 }
127
128 /**
129 * Create a new pool and add the filters to it.
130 * A split cannot have files from different pools.
131 */
132 protected void createPool(List<PathFilter> filters) {
133 pools.add(new MultiPathFilter(filters));
134 }
135
136 /**
137 * Create a new pool and add the filters to it.
138 * A pathname can satisfy any one of the specified filters.
139 * A split cannot have files from different pools.
140 */
141 protected void createPool(PathFilter... filters) {
142 MultiPathFilter multi = new MultiPathFilter();
143 for (PathFilter f: filters) {
144 multi.add(f);
145 }
146 pools.add(multi);
147 }
148
149 @Override
150 protected boolean isSplitable(JobContext context, Path file) {
151 final CompressionCodec codec =
152 new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
153 if (null == codec) {
154 return true;
155 }
156 return codec instanceof SplittableCompressionCodec;
157 }
158
159 /**
160 * default constructor
161 */
162 public CombineFileInputFormat() {
163 }
164
165 @Override
166 public List<InputSplit> getSplits(JobContext job)
167 throws IOException {
168 long minSizeNode = 0;
169 long minSizeRack = 0;
170 long maxSize = 0;
171 Configuration conf = job.getConfiguration();
172
173 // the values specified by setxxxSplitSize() takes precedence over the
174 // values that might have been specified in the config
175 if (minSplitSizeNode != 0) {
176 minSizeNode = minSplitSizeNode;
177 } else {
178 minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
179 }
180 if (minSplitSizeRack != 0) {
181 minSizeRack = minSplitSizeRack;
182 } else {
183 minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
184 }
185 if (maxSplitSize != 0) {
186 maxSize = maxSplitSize;
187 } else {
188 maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
189 }
190 if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
191 throw new IOException("Minimum split size pernode " + minSizeNode +
192 " cannot be larger than maximum split size " +
193 maxSize);
194 }
195 if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
196 throw new IOException("Minimum split size per rack" + minSizeRack +
197 " cannot be larger than maximum split size " +
198 maxSize);
199 }
200 if (minSizeRack != 0 && minSizeNode > minSizeRack) {
201 throw new IOException("Minimum split size per node" + minSizeNode +
202 " cannot be smaller than minimum split " +
203 "size per rack " + minSizeRack);
204 }
205
206 // all the files in input set
207 Path[] paths = FileUtil.stat2Paths(
208 listStatus(job).toArray(new FileStatus[0]));
209 List<InputSplit> splits = new ArrayList<InputSplit>();
210 if (paths.length == 0) {
211 return splits;
212 }
213
214 // Convert them to Paths first. This is a costly operation and
215 // we should do it first, otherwise we will incur doing it multiple
216 // times, one time each for each pool in the next loop.
217 List<Path> newpaths = new LinkedList<Path>();
218 for (int i = 0; i < paths.length; i++) {
219 FileSystem fs = paths[i].getFileSystem(conf);
220 Path p = fs.makeQualified(paths[i]);
221 newpaths.add(p);
222 }
223
224 // In one single iteration, process all the paths in a single pool.
225 // Processing one pool at a time ensures that a split contains paths
226 // from a single pool only.
227 for (MultiPathFilter onepool : pools) {
228 ArrayList<Path> myPaths = new ArrayList<Path>();
229
230 // pick one input path. If it matches all the filters in a pool,
231 // add it to the output set
232 for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
233 Path p = iter.next();
234 if (onepool.accept(p)) {
235 myPaths.add(p); // add it to my output set
236 iter.remove();
237 }
238 }
239 // create splits for all files in this pool.
240 getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
241 maxSize, minSizeNode, minSizeRack, splits);
242 }
243
244 // create splits for all files that are not in any pool.
245 getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
246 maxSize, minSizeNode, minSizeRack, splits);
247
248 // free up rackToNodes map
249 rackToNodes.clear();
250 return splits;
251 }
252
253 /**
254 * Return all the splits in the specified set of paths
255 */
256 private void getMoreSplits(JobContext job, Path[] paths,
257 long maxSize, long minSizeNode, long minSizeRack,
258 List<InputSplit> splits)
259 throws IOException {
260 Configuration conf = job.getConfiguration();
261
262 // all blocks for all the files in input set
263 OneFileInfo[] files;
264
265 // mapping from a rack name to the list of blocks it has
266 HashMap<String, List<OneBlockInfo>> rackToBlocks =
267 new HashMap<String, List<OneBlockInfo>>();
268
269 // mapping from a block to the nodes on which it has replicas
270 HashMap<OneBlockInfo, String[]> blockToNodes =
271 new HashMap<OneBlockInfo, String[]>();
272
273 // mapping from a node to the list of blocks that it contains
274 HashMap<String, List<OneBlockInfo>> nodeToBlocks =
275 new HashMap<String, List<OneBlockInfo>>();
276
277 files = new OneFileInfo[paths.length];
278 if (paths.length == 0) {
279 return;
280 }
281
282 // populate all the blocks for all files
283 long totLength = 0;
284 for (int i = 0; i < paths.length; i++) {
285 files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
286 rackToBlocks, blockToNodes, nodeToBlocks,
287 rackToNodes, maxSize);
288 totLength += files[i].getLength();
289 }
290 createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
291 maxSize, minSizeNode, minSizeRack, splits);
292 }
293
294 @VisibleForTesting
295 void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
296 HashMap<OneBlockInfo, String[]> blockToNodes,
297 HashMap<String, List<OneBlockInfo>> rackToBlocks,
298 long totLength,
299 long maxSize,
300 long minSizeNode,
301 long minSizeRack,
302 List<InputSplit> splits
303 ) {
304 ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
305 Set<String> nodes = new HashSet<String>();
306 long curSplitSize = 0;
307
308 int numNodes = nodeToBlocks.size();
309 long totalLength = totLength;
310
311 while(true) {
312 // it is allowed for maxSize to be 0. Disable smoothing load for such cases
313 int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
314 ((int) (totalLength/maxSize))/numNodes
315 : Integer.MAX_VALUE;
316 int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
317 numNodes = 0;
318
319 // process all nodes and create splits that are local to a node.
320 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
321 .entrySet().iterator(); iter.hasNext();) {
322 Map.Entry<String, List<OneBlockInfo>> one = iter.next();
323 nodes.add(one.getKey());
324 List<OneBlockInfo> blocksInNode = one.getValue();
325
326 // for each block, copy it into validBlocks. Delete it from
327 // blockToNodes so that the same block does not appear in
328 // two different splits.
329 int splitsInNode = 0;
330 for (OneBlockInfo oneblock : blocksInNode) {
331 if (blockToNodes.containsKey(oneblock)) {
332 validBlocks.add(oneblock);
333 blockToNodes.remove(oneblock);
334 curSplitSize += oneblock.length;
335
336 // if the accumulated split size exceeds the maximum, then
337 // create this split.
338 if (maxSize != 0 && curSplitSize >= maxSize) {
339 // create an input split and add it to the splits array
340 addCreatedSplit(splits, nodes, validBlocks);
341 totalLength -= curSplitSize;
342 curSplitSize = 0;
343 validBlocks.clear();
344 splitsInNode++;
345 if (splitsInNode == maxSplitsByNodeOnly) {
346 // stop grouping on a node so as not to create
347 // disproportionately more splits on a node because it happens
348 // to have many blocks
349 // consider only these nodes in next round of grouping because
350 // they have leftover blocks that may need to be grouped
351 numNodes++;
352 break;
353 }
354 }
355 }
356 }
357 // if there were any blocks left over and their combined size is
358 // larger than minSplitNode, then combine them into one split.
359 // Otherwise add them back to the unprocessed pool. It is likely
360 // that they will be combined with other blocks from the
361 // same rack later on.
362 if (minSizeNode != 0 && curSplitSize >= minSizeNode
363 && splitsInNode == 0) {
364 // haven't created any split on this machine. so its ok to add a
365 // smaller
366 // one for parallelism. Otherwise group it in the rack for balanced
367 // size
368 // create an input split and add it to the splits array
369 addCreatedSplit(splits, nodes, validBlocks);
370 totalLength -= curSplitSize;
371 } else {
372 for (OneBlockInfo oneblock : validBlocks) {
373 blockToNodes.put(oneblock, oneblock.hosts);
374 }
375 }
376 validBlocks.clear();
377 nodes.clear();
378 curSplitSize = 0;
379 }
380
381 if(!(numNodes>0 && totalLength>0)) {
382 break;
383 }
384 }
385
386 // if blocks in a rack are below the specified minimum size, then keep them
387 // in 'overflow'. After the processing of all racks is complete, these
388 // overflow blocks will be combined into splits.
389 ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
390 Set<String> racks = new HashSet<String>();
391
392 // Process all racks over and over again until there is no more work to do.
393 while (blockToNodes.size() > 0) {
394
395 // Create one split for this rack before moving over to the next rack.
396 // Come back to this rack after creating a single split for each of the
397 // remaining racks.
398 // Process one rack location at a time, Combine all possible blocks that
399 // reside on this rack as one split. (constrained by minimum and maximum
400 // split size).
401
402 // iterate over all racks
403 for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
404 rackToBlocks.entrySet().iterator(); iter.hasNext();) {
405
406 Map.Entry<String, List<OneBlockInfo>> one = iter.next();
407 racks.add(one.getKey());
408 List<OneBlockInfo> blocks = one.getValue();
409
410 // for each block, copy it into validBlocks. Delete it from
411 // blockToNodes so that the same block does not appear in
412 // two different splits.
413 boolean createdSplit = false;
414 for (OneBlockInfo oneblock : blocks) {
415 if (blockToNodes.containsKey(oneblock)) {
416 validBlocks.add(oneblock);
417 blockToNodes.remove(oneblock);
418 curSplitSize += oneblock.length;
419
420 // if the accumulated split size exceeds the maximum, then
421 // create this split.
422 if (maxSize != 0 && curSplitSize >= maxSize) {
423 // create an input split and add it to the splits array
424 addCreatedSplit(splits, getHosts(racks), validBlocks);
425 createdSplit = true;
426 break;
427 }
428 }
429 }
430
431 // if we created a split, then just go to the next rack
432 if (createdSplit) {
433 curSplitSize = 0;
434 validBlocks.clear();
435 racks.clear();
436 continue;
437 }
438
439 if (!validBlocks.isEmpty()) {
440 if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
441 // if there is a minimum size specified, then create a single split
442 // otherwise, store these blocks into overflow data structure
443 addCreatedSplit(splits, getHosts(racks), validBlocks);
444 } else {
445 // There were a few blocks in this rack that
446 // remained to be processed. Keep them in 'overflow' block list.
447 // These will be combined later.
448 overflowBlocks.addAll(validBlocks);
449 }
450 }
451 curSplitSize = 0;
452 validBlocks.clear();
453 racks.clear();
454 }
455 }
456
457 assert blockToNodes.isEmpty();
458 assert curSplitSize == 0;
459 assert validBlocks.isEmpty();
460 assert racks.isEmpty();
461
462 // Process all overflow blocks
463 for (OneBlockInfo oneblock : overflowBlocks) {
464 validBlocks.add(oneblock);
465 curSplitSize += oneblock.length;
466
467 // This might cause an exiting rack location to be re-added,
468 // but it should be ok.
469 for (int i = 0; i < oneblock.racks.length; i++) {
470 racks.add(oneblock.racks[i]);
471 }
472
473 // if the accumulated split size exceeds the maximum, then
474 // create this split.
475 if (maxSize != 0 && curSplitSize >= maxSize) {
476 // create an input split and add it to the splits array
477 addCreatedSplit(splits, getHosts(racks), validBlocks);
478 curSplitSize = 0;
479 validBlocks.clear();
480 racks.clear();
481 }
482 }
483
484 // Process any remaining blocks, if any.
485 if (!validBlocks.isEmpty()) {
486 addCreatedSplit(splits, getHosts(racks), validBlocks);
487 }
488 }
489
490 /**
491 * Create a single split from the list of blocks specified in validBlocks
492 * Add this new split into splitList.
493 */
494 private void addCreatedSplit(List<InputSplit> splitList,
495 Collection<String> locations,
496 ArrayList<OneBlockInfo> validBlocks) {
497 // create an input split
498 Path[] fl = new Path[validBlocks.size()];
499 long[] offset = new long[validBlocks.size()];
500 long[] length = new long[validBlocks.size()];
501 for (int i = 0; i < validBlocks.size(); i++) {
502 fl[i] = validBlocks.get(i).onepath;
503 offset[i] = validBlocks.get(i).offset;
504 length[i] = validBlocks.get(i).length;
505 }
506 // add this split to the list that is returned
507 CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
508 length, locations.toArray(new String[0]));
509 splitList.add(thissplit);
510 }
511
512 /**
513 * This is not implemented yet.
514 */
515 public abstract RecordReader<K, V> createRecordReader(InputSplit split,
516 TaskAttemptContext context) throws IOException;
517
518 /**
519 * information about one file from the File System
520 */
521 @VisibleForTesting
522 static class OneFileInfo {
523 private long fileSize; // size of the file
524 private OneBlockInfo[] blocks; // all blocks in this file
525
526 OneFileInfo(Path path, Configuration conf,
527 boolean isSplitable,
528 HashMap<String, List<OneBlockInfo>> rackToBlocks,
529 HashMap<OneBlockInfo, String[]> blockToNodes,
530 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
531 HashMap<String, Set<String>> rackToNodes,
532 long maxSize)
533 throws IOException {
534 this.fileSize = 0;
535
536 // get block locations from file system
537 FileSystem fs = path.getFileSystem(conf);
538 FileStatus stat = fs.getFileStatus(path);
539 BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
540 stat.getLen());
541 // create a list of all block and their locations
542 if (locations == null) {
543 blocks = new OneBlockInfo[0];
544 } else {
545
546 if(locations.length == 0) {
547 locations = new BlockLocation[] { new BlockLocation() };
548 }
549
550 if (!isSplitable) {
551 // if the file is not splitable, just create the one block with
552 // full file length
553 blocks = new OneBlockInfo[1];
554 fileSize = stat.getLen();
555 blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
556 .getHosts(), locations[0].getTopologyPaths());
557 } else {
558 ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
559 locations.length);
560 for (int i = 0; i < locations.length; i++) {
561 fileSize += locations[i].getLength();
562
563 // each split can be a maximum of maxSize
564 long left = locations[i].getLength();
565 long myOffset = locations[i].getOffset();
566 long myLength = 0;
567 do {
568 if (maxSize == 0) {
569 myLength = left;
570 } else {
571 if (left > maxSize && left < 2 * maxSize) {
572 // if remainder is between max and 2*max - then
573 // instead of creating splits of size max, left-max we
574 // create splits of size left/2 and left/2. This is
575 // a heuristic to avoid creating really really small
576 // splits.
577 myLength = left / 2;
578 } else {
579 myLength = Math.min(maxSize, left);
580 }
581 }
582 OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
583 myLength, locations[i].getHosts(), locations[i]
584 .getTopologyPaths());
585 left -= myLength;
586 myOffset += myLength;
587
588 blocksList.add(oneblock);
589 } while (left > 0);
590 }
591 blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
592 }
593
594 populateBlockInfo(blocks, rackToBlocks, blockToNodes,
595 nodeToBlocks, rackToNodes);
596 }
597 }
598
599 @VisibleForTesting
600 static void populateBlockInfo(OneBlockInfo[] blocks,
601 HashMap<String, List<OneBlockInfo>> rackToBlocks,
602 HashMap<OneBlockInfo, String[]> blockToNodes,
603 HashMap<String, List<OneBlockInfo>> nodeToBlocks,
604 HashMap<String, Set<String>> rackToNodes) {
605 for (OneBlockInfo oneblock : blocks) {
606 // add this block to the block --> node locations map
607 blockToNodes.put(oneblock, oneblock.hosts);
608
609 // For blocks that do not have host/rack information,
610 // assign to default rack.
611 String[] racks = null;
612 if (oneblock.hosts.length == 0) {
613 racks = new String[]{NetworkTopology.DEFAULT_RACK};
614 } else {
615 racks = oneblock.racks;
616 }
617
618 // add this block to the rack --> block map
619 for (int j = 0; j < racks.length; j++) {
620 String rack = racks[j];
621 List<OneBlockInfo> blklist = rackToBlocks.get(rack);
622 if (blklist == null) {
623 blklist = new ArrayList<OneBlockInfo>();
624 rackToBlocks.put(rack, blklist);
625 }
626 blklist.add(oneblock);
627 if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
628 // Add this host to rackToNodes map
629 addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
630 }
631 }
632
633 // add this block to the node --> block map
634 for (int j = 0; j < oneblock.hosts.length; j++) {
635 String node = oneblock.hosts[j];
636 List<OneBlockInfo> blklist = nodeToBlocks.get(node);
637 if (blklist == null) {
638 blklist = new ArrayList<OneBlockInfo>();
639 nodeToBlocks.put(node, blklist);
640 }
641 blklist.add(oneblock);
642 }
643 }
644 }
645
646 long getLength() {
647 return fileSize;
648 }
649
650 OneBlockInfo[] getBlocks() {
651 return blocks;
652 }
653 }
654
655 /**
656 * information about one block from the File System
657 */
658 @VisibleForTesting
659 static class OneBlockInfo {
660 Path onepath; // name of this file
661 long offset; // offset in file
662 long length; // length of this block
663 String[] hosts; // nodes on which this block resides
664 String[] racks; // network topology of hosts
665
666 OneBlockInfo(Path path, long offset, long len,
667 String[] hosts, String[] topologyPaths) {
668 this.onepath = path;
669 this.offset = offset;
670 this.hosts = hosts;
671 this.length = len;
672 assert (hosts.length == topologyPaths.length ||
673 topologyPaths.length == 0);
674
675 // if the file system does not have any rack information, then
676 // use dummy rack location.
677 if (topologyPaths.length == 0) {
678 topologyPaths = new String[hosts.length];
679 for (int i = 0; i < topologyPaths.length; i++) {
680 topologyPaths[i] = (new NodeBase(hosts[i],
681 NetworkTopology.DEFAULT_RACK)).toString();
682 }
683 }
684
685 // The topology paths have the host name included as the last
686 // component. Strip it.
687 this.racks = new String[topologyPaths.length];
688 for (int i = 0; i < topologyPaths.length; i++) {
689 this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
690 }
691 }
692 }
693
694 protected BlockLocation[] getFileBlockLocations(
695 FileSystem fs, FileStatus stat) throws IOException {
696 return fs.getFileBlockLocations(stat, 0, stat.getLen());
697 }
698
699 private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
700 String rack, String host) {
701 Set<String> hosts = rackToNodes.get(rack);
702 if (hosts == null) {
703 hosts = new HashSet<String>();
704 rackToNodes.put(rack, hosts);
705 }
706 hosts.add(host);
707 }
708
709 private Set<String> getHosts(Set<String> racks) {
710 Set<String> hosts = new HashSet<String>();
711 for (String rack : racks) {
712 if (rackToNodes.containsKey(rack)) {
713 hosts.addAll(rackToNodes.get(rack));
714 }
715 }
716 return hosts;
717 }
718
719 /**
720 * Accept a path only if any one of filters given in the
721 * constructor do.
722 */
723 private static class MultiPathFilter implements PathFilter {
724 private List<PathFilter> filters;
725
726 public MultiPathFilter() {
727 this.filters = new ArrayList<PathFilter>();
728 }
729
730 public MultiPathFilter(List<PathFilter> filters) {
731 this.filters = filters;
732 }
733
734 public void add(PathFilter one) {
735 filters.add(one);
736 }
737
738 public boolean accept(Path path) {
739 for (PathFilter filter : filters) {
740 if (filter.accept(path)) {
741 return true;
742 }
743 }
744 return false;
745 }
746
747 public String toString() {
748 StringBuffer buf = new StringBuffer();
749 buf.append("[");
750 for (PathFilter f: filters) {
751 buf.append(f);
752 buf.append(",");
753 }
754 buf.append("]");
755 return buf.toString();
756 }
757 }
758 }