001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.DataInput;
022    import java.io.DataOutput;
023    import java.io.IOException;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.fs.Path;
028    import org.apache.hadoop.io.Text;
029    import org.apache.hadoop.io.Writable;
030    import org.apache.hadoop.mapreduce.InputSplit;
031    import org.apache.hadoop.mapreduce.RecordReader;
032    
033    /**
034     * A sub-collection of input files. 
035     * 
036     * Unlike {@link FileSplit}, CombineFileSplit class does not represent 
037     * a split of a file, but a split of input files into smaller sets. 
038     * A split may contain blocks from different file but all 
039     * the blocks in the same split are probably local to some rack <br> 
040     * CombineFileSplit can be used to implement {@link RecordReader}'s, 
041     * with reading one record per file.
042     * 
043     * @see FileSplit
044     * @see CombineFileInputFormat 
045     */
046    @InterfaceAudience.Public
047    @InterfaceStability.Stable
048    public class CombineFileSplit extends InputSplit implements Writable {
049    
050      private Path[] paths;
051      private long[] startoffset;
052      private long[] lengths;
053      private String[] locations;
054      private long totLength;
055    
056      /**
057       * default constructor
058       */
059      public CombineFileSplit() {}
060      public CombineFileSplit(Path[] files, long[] start, 
061                              long[] lengths, String[] locations) {
062        initSplit(files, start, lengths, locations);
063      }
064    
065      public CombineFileSplit(Path[] files, long[] lengths) {
066        long[] startoffset = new long[files.length];
067        for (int i = 0; i < startoffset.length; i++) {
068          startoffset[i] = 0;
069        }
070        String[] locations = new String[files.length];
071        for (int i = 0; i < locations.length; i++) {
072          locations[i] = "";
073        }
074        initSplit(files, startoffset, lengths, locations);
075      }
076      
077      private void initSplit(Path[] files, long[] start, 
078                             long[] lengths, String[] locations) {
079        this.startoffset = start;
080        this.lengths = lengths;
081        this.paths = files;
082        this.totLength = 0;
083        this.locations = locations;
084        for(long length : lengths) {
085          totLength += length;
086        }
087      }
088    
089      /**
090       * Copy constructor
091       */
092      public CombineFileSplit(CombineFileSplit old) throws IOException {
093        this(old.getPaths(), old.getStartOffsets(),
094             old.getLengths(), old.getLocations());
095      }
096    
097      public long getLength() {
098        return totLength;
099      }
100    
101      /** Returns an array containing the start offsets of the files in the split*/ 
102      public long[] getStartOffsets() {
103        return startoffset;
104      }
105      
106      /** Returns an array containing the lengths of the files in the split*/ 
107      public long[] getLengths() {
108        return lengths;
109      }
110    
111      /** Returns the start offset of the i<sup>th</sup> Path */
112      public long getOffset(int i) {
113        return startoffset[i];
114      }
115      
116      /** Returns the length of the i<sup>th</sup> Path */
117      public long getLength(int i) {
118        return lengths[i];
119      }
120      
121      /** Returns the number of Paths in the split */
122      public int getNumPaths() {
123        return paths.length;
124      }
125    
126      /** Returns the i<sup>th</sup> Path */
127      public Path getPath(int i) {
128        return paths[i];
129      }
130      
131      /** Returns all the Paths in the split */
132      public Path[] getPaths() {
133        return paths;
134      }
135    
136      /** Returns all the Paths where this input-split resides */
137      public String[] getLocations() throws IOException {
138        return locations;
139      }
140    
141      public void readFields(DataInput in) throws IOException {
142        totLength = in.readLong();
143        int arrLength = in.readInt();
144        lengths = new long[arrLength];
145        for(int i=0; i<arrLength;i++) {
146          lengths[i] = in.readLong();
147        }
148        int filesLength = in.readInt();
149        paths = new Path[filesLength];
150        for(int i=0; i<filesLength;i++) {
151          paths[i] = new Path(Text.readString(in));
152        }
153        arrLength = in.readInt();
154        startoffset = new long[arrLength];
155        for(int i=0; i<arrLength;i++) {
156          startoffset[i] = in.readLong();
157        }
158      }
159    
160      public void write(DataOutput out) throws IOException {
161        out.writeLong(totLength);
162        out.writeInt(lengths.length);
163        for(long length : lengths) {
164          out.writeLong(length);
165        }
166        out.writeInt(paths.length);
167        for(Path p : paths) {
168          Text.writeString(out, p.toString());
169        }
170        out.writeInt(startoffset.length);
171        for(long length : startoffset) {
172          out.writeLong(length);
173        }
174      }
175      
176      @Override
177     public String toString() {
178        StringBuffer sb = new StringBuffer();
179        for (int i = 0; i < paths.length; i++) {
180          if (i == 0 ) {
181            sb.append("Paths:");
182          }
183          sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
184                    "+" + lengths[i]);
185          if (i < paths.length -1) {
186            sb.append(",");
187          }
188        }
189        if (locations != null) {
190          String locs = "";
191          StringBuffer locsb = new StringBuffer();
192          for (int i = 0; i < locations.length; i++) {
193            locsb.append(locations[i] + ":");
194          }
195          locs = locsb.toString();
196          sb.append(" Locations:" + locs + "; ");
197        }
198        return sb.toString();
199      }
200    }