001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.FileUtil;
029import org.apache.hadoop.fs.Path;
030
031/**
032 * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
033 * in {@link #getSplits(JobConf, int)} method. Splits are constructed from 
034 * the files under the input paths. Each split returned contains <i>nearly</i>
035 * equal content length. <br>  
036 * Subclasses implement {@link #getRecordReader(InputSplit, JobConf, Reporter)}
037 * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
038 * @see MultiFileSplit
039 */
040@InterfaceAudience.Public
041@InterfaceStability.Stable
042public abstract class MultiFileInputFormat<K, V>
043  extends FileInputFormat<K, V> {
044
045  @Override
046  public InputSplit[] getSplits(JobConf job, int numSplits) 
047    throws IOException {
048    
049    Path[] paths = FileUtil.stat2Paths(listStatus(job));
050    List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
051    if (paths.length != 0) {
052      // HADOOP-1818: Manage splits only if there are paths
053      long[] lengths = new long[paths.length];
054      long totLength = 0;
055      for(int i=0; i<paths.length; i++) {
056        FileSystem fs = paths[i].getFileSystem(job);
057        lengths[i] = fs.getContentSummary(paths[i]).getLength();
058        totLength += lengths[i];
059      }
060      double avgLengthPerSplit = ((double)totLength) / numSplits;
061      long cumulativeLength = 0;
062
063      int startIndex = 0;
064
065      for(int i=0; i<numSplits; i++) {
066        int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
067            , startIndex, lengths);
068        if (splitSize != 0) {
069          // HADOOP-1818: Manage split only if split size is not equals to 0
070          Path[] splitPaths = new Path[splitSize];
071          long[] splitLengths = new long[splitSize];
072          System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
073          System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
074          splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
075          startIndex += splitSize;
076          for(long l: splitLengths) {
077            cumulativeLength += l;
078          }
079        }
080      }
081    }
082    return splits.toArray(new MultiFileSplit[splits.size()]);    
083  }
084
085  private int findSize(int splitIndex, double avgLengthPerSplit
086      , long cumulativeLength , int startIndex, long[] lengths) {
087    
088    if(splitIndex == lengths.length - 1)
089      return lengths.length - startIndex;
090    
091    long goalLength = (long)((splitIndex + 1) * avgLengthPerSplit);
092    long partialLength = 0;
093    // accumulate till just above the goal length;
094    for(int i = startIndex; i < lengths.length; i++) {
095      partialLength += lengths[i];
096      if(partialLength + cumulativeLength >= goalLength) {
097        return i - startIndex + 1;
098      }
099    }
100    return lengths.length - startIndex;
101  }
102  
103  @Override
104  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
105      JobConf job, Reporter reporter)
106      throws IOException;
107}