001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.io.Text;
029import org.apache.hadoop.io.Writable;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.RecordReader;
032
033/**
034 * A sub-collection of input files. 
035 * 
036 * Unlike {@link FileSplit}, CombineFileSplit class does not represent 
037 * a split of a file, but a split of input files into smaller sets. 
038 * A split may contain blocks from different file but all 
039 * the blocks in the same split are probably local to some rack <br> 
040 * CombineFileSplit can be used to implement {@link RecordReader}'s, 
041 * with reading one record per file.
042 * 
043 * @see FileSplit
044 * @see CombineFileInputFormat 
045 */
046@InterfaceAudience.Public
047@InterfaceStability.Stable
048public class CombineFileSplit extends InputSplit implements Writable {
049
050  private Path[] paths;
051  private long[] startoffset;
052  private long[] lengths;
053  private String[] locations;
054  private long totLength;
055
056  /**
057   * default constructor
058   */
059  public CombineFileSplit() {}
060  public CombineFileSplit(Path[] files, long[] start, 
061                          long[] lengths, String[] locations) {
062    initSplit(files, start, lengths, locations);
063  }
064
065  public CombineFileSplit(Path[] files, long[] lengths) {
066    long[] startoffset = new long[files.length];
067    for (int i = 0; i < startoffset.length; i++) {
068      startoffset[i] = 0;
069    }
070    String[] locations = new String[files.length];
071    for (int i = 0; i < locations.length; i++) {
072      locations[i] = "";
073    }
074    initSplit(files, startoffset, lengths, locations);
075  }
076  
077  private void initSplit(Path[] files, long[] start, 
078                         long[] lengths, String[] locations) {
079    this.startoffset = start;
080    this.lengths = lengths;
081    this.paths = files;
082    this.totLength = 0;
083    this.locations = locations;
084    for(long length : lengths) {
085      totLength += length;
086    }
087  }
088
089  /**
090   * Copy constructor
091   */
092  public CombineFileSplit(CombineFileSplit old) throws IOException {
093    this(old.getPaths(), old.getStartOffsets(),
094         old.getLengths(), old.getLocations());
095  }
096
097  public long getLength() {
098    return totLength;
099  }
100
101  /** Returns an array containing the start offsets of the files in the split*/ 
102  public long[] getStartOffsets() {
103    return startoffset;
104  }
105  
106  /** Returns an array containing the lengths of the files in the split*/ 
107  public long[] getLengths() {
108    return lengths;
109  }
110
111  /** Returns the start offset of the i<sup>th</sup> Path */
112  public long getOffset(int i) {
113    return startoffset[i];
114  }
115  
116  /** Returns the length of the i<sup>th</sup> Path */
117  public long getLength(int i) {
118    return lengths[i];
119  }
120  
121  /** Returns the number of Paths in the split */
122  public int getNumPaths() {
123    return paths.length;
124  }
125
126  /** Returns the i<sup>th</sup> Path */
127  public Path getPath(int i) {
128    return paths[i];
129  }
130  
131  /** Returns all the Paths in the split */
132  public Path[] getPaths() {
133    return paths;
134  }
135
136  /** Returns all the Paths where this input-split resides */
137  public String[] getLocations() throws IOException {
138    return locations;
139  }
140
141  public void readFields(DataInput in) throws IOException {
142    totLength = in.readLong();
143    int arrLength = in.readInt();
144    lengths = new long[arrLength];
145    for(int i=0; i<arrLength;i++) {
146      lengths[i] = in.readLong();
147    }
148    int filesLength = in.readInt();
149    paths = new Path[filesLength];
150    for(int i=0; i<filesLength;i++) {
151      paths[i] = new Path(Text.readString(in));
152    }
153    arrLength = in.readInt();
154    startoffset = new long[arrLength];
155    for(int i=0; i<arrLength;i++) {
156      startoffset[i] = in.readLong();
157    }
158  }
159
160  public void write(DataOutput out) throws IOException {
161    out.writeLong(totLength);
162    out.writeInt(lengths.length);
163    for(long length : lengths) {
164      out.writeLong(length);
165    }
166    out.writeInt(paths.length);
167    for(Path p : paths) {
168      Text.writeString(out, p.toString());
169    }
170    out.writeInt(startoffset.length);
171    for(long length : startoffset) {
172      out.writeLong(length);
173    }
174  }
175  
176  @Override
177 public String toString() {
178    StringBuffer sb = new StringBuffer();
179    for (int i = 0; i < paths.length; i++) {
180      if (i == 0 ) {
181        sb.append("Paths:");
182      }
183      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
184                "+" + lengths[i]);
185      if (i < paths.length -1) {
186        sb.append(",");
187      }
188    }
189    if (locations != null) {
190      String locs = "";
191      StringBuffer locsb = new StringBuffer();
192      for (int i = 0; i < locations.length; i++) {
193        locsb.append(locations[i] + ":");
194      }
195      locs = locsb.toString();
196      sb.append(" Locations:" + locs + "; ");
197    }
198    return sb.toString();
199  }
200}