001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.DataInput;
022 import java.io.DataOutput;
023 import java.io.IOException;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.fs.Path;
028 import org.apache.hadoop.io.Text;
029 import org.apache.hadoop.io.Writable;
030 import org.apache.hadoop.mapreduce.InputSplit;
031 import org.apache.hadoop.mapreduce.RecordReader;
032
033 /**
034 * A sub-collection of input files.
035 *
036 * Unlike {@link FileSplit}, CombineFileSplit class does not represent
037 * a split of a file, but a split of input files into smaller sets.
038 * A split may contain blocks from different file but all
039 * the blocks in the same split are probably local to some rack <br>
040 * CombineFileSplit can be used to implement {@link RecordReader}'s,
041 * with reading one record per file.
042 *
043 * @see FileSplit
044 * @see CombineFileInputFormat
045 */
046 @InterfaceAudience.Public
047 @InterfaceStability.Stable
048 public class CombineFileSplit extends InputSplit implements Writable {
049
050 private Path[] paths;
051 private long[] startoffset;
052 private long[] lengths;
053 private String[] locations;
054 private long totLength;
055
056 /**
057 * default constructor
058 */
059 public CombineFileSplit() {}
060 public CombineFileSplit(Path[] files, long[] start,
061 long[] lengths, String[] locations) {
062 initSplit(files, start, lengths, locations);
063 }
064
065 public CombineFileSplit(Path[] files, long[] lengths) {
066 long[] startoffset = new long[files.length];
067 for (int i = 0; i < startoffset.length; i++) {
068 startoffset[i] = 0;
069 }
070 String[] locations = new String[files.length];
071 for (int i = 0; i < locations.length; i++) {
072 locations[i] = "";
073 }
074 initSplit(files, startoffset, lengths, locations);
075 }
076
077 private void initSplit(Path[] files, long[] start,
078 long[] lengths, String[] locations) {
079 this.startoffset = start;
080 this.lengths = lengths;
081 this.paths = files;
082 this.totLength = 0;
083 this.locations = locations;
084 for(long length : lengths) {
085 totLength += length;
086 }
087 }
088
089 /**
090 * Copy constructor
091 */
092 public CombineFileSplit(CombineFileSplit old) throws IOException {
093 this(old.getPaths(), old.getStartOffsets(),
094 old.getLengths(), old.getLocations());
095 }
096
097 public long getLength() {
098 return totLength;
099 }
100
101 /** Returns an array containing the start offsets of the files in the split*/
102 public long[] getStartOffsets() {
103 return startoffset;
104 }
105
106 /** Returns an array containing the lengths of the files in the split*/
107 public long[] getLengths() {
108 return lengths;
109 }
110
111 /** Returns the start offset of the i<sup>th</sup> Path */
112 public long getOffset(int i) {
113 return startoffset[i];
114 }
115
116 /** Returns the length of the i<sup>th</sup> Path */
117 public long getLength(int i) {
118 return lengths[i];
119 }
120
121 /** Returns the number of Paths in the split */
122 public int getNumPaths() {
123 return paths.length;
124 }
125
126 /** Returns the i<sup>th</sup> Path */
127 public Path getPath(int i) {
128 return paths[i];
129 }
130
131 /** Returns all the Paths in the split */
132 public Path[] getPaths() {
133 return paths;
134 }
135
136 /** Returns all the Paths where this input-split resides */
137 public String[] getLocations() throws IOException {
138 return locations;
139 }
140
141 public void readFields(DataInput in) throws IOException {
142 totLength = in.readLong();
143 int arrLength = in.readInt();
144 lengths = new long[arrLength];
145 for(int i=0; i<arrLength;i++) {
146 lengths[i] = in.readLong();
147 }
148 int filesLength = in.readInt();
149 paths = new Path[filesLength];
150 for(int i=0; i<filesLength;i++) {
151 paths[i] = new Path(Text.readString(in));
152 }
153 arrLength = in.readInt();
154 startoffset = new long[arrLength];
155 for(int i=0; i<arrLength;i++) {
156 startoffset[i] = in.readLong();
157 }
158 }
159
160 public void write(DataOutput out) throws IOException {
161 out.writeLong(totLength);
162 out.writeInt(lengths.length);
163 for(long length : lengths) {
164 out.writeLong(length);
165 }
166 out.writeInt(paths.length);
167 for(Path p : paths) {
168 Text.writeString(out, p.toString());
169 }
170 out.writeInt(startoffset.length);
171 for(long length : startoffset) {
172 out.writeLong(length);
173 }
174 }
175
176 @Override
177 public String toString() {
178 StringBuffer sb = new StringBuffer();
179 for (int i = 0; i < paths.length; i++) {
180 if (i == 0 ) {
181 sb.append("Paths:");
182 }
183 sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
184 "+" + lengths[i]);
185 if (i < paths.length -1) {
186 sb.append(",");
187 }
188 }
189 if (locations != null) {
190 String locs = "";
191 StringBuffer locsb = new StringBuffer();
192 for (int i = 0; i < locations.length; i++) {
193 locsb.append(locations[i] + ":");
194 }
195 locs = locsb.toString();
196 sb.append(" Locations:" + locs + "; ");
197 }
198 return sb.toString();
199 }
200 }