001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.input; 020 021 import java.io.DataInput; 022 import java.io.DataOutput; 023 import java.io.IOException; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.fs.Path; 028 import org.apache.hadoop.io.Text; 029 import org.apache.hadoop.io.Writable; 030 import org.apache.hadoop.mapreduce.InputSplit; 031 import org.apache.hadoop.mapreduce.RecordReader; 032 033 /** 034 * A sub-collection of input files. 035 * 036 * Unlike {@link FileSplit}, CombineFileSplit class does not represent 037 * a split of a file, but a split of input files into smaller sets. 038 * A split may contain blocks from different file but all 039 * the blocks in the same split are probably local to some rack <br> 040 * CombineFileSplit can be used to implement {@link RecordReader}'s, 041 * with reading one record per file. 042 * 043 * @see FileSplit 044 * @see CombineFileInputFormat 045 */ 046 @InterfaceAudience.Public 047 @InterfaceStability.Stable 048 public class CombineFileSplit extends InputSplit implements Writable { 049 050 private Path[] paths; 051 private long[] startoffset; 052 private long[] lengths; 053 private String[] locations; 054 private long totLength; 055 056 /** 057 * default constructor 058 */ 059 public CombineFileSplit() {} 060 public CombineFileSplit(Path[] files, long[] start, 061 long[] lengths, String[] locations) { 062 initSplit(files, start, lengths, locations); 063 } 064 065 public CombineFileSplit(Path[] files, long[] lengths) { 066 long[] startoffset = new long[files.length]; 067 for (int i = 0; i < startoffset.length; i++) { 068 startoffset[i] = 0; 069 } 070 String[] locations = new String[files.length]; 071 for (int i = 0; i < locations.length; i++) { 072 locations[i] = ""; 073 } 074 initSplit(files, startoffset, lengths, locations); 075 } 076 077 private void initSplit(Path[] files, long[] start, 078 long[] lengths, String[] locations) { 079 this.startoffset = start; 080 this.lengths = lengths; 081 this.paths = files; 082 this.totLength = 0; 083 this.locations = locations; 084 for(long length : lengths) { 085 totLength += length; 086 } 087 } 088 089 /** 090 * Copy constructor 091 */ 092 public CombineFileSplit(CombineFileSplit old) throws IOException { 093 this(old.getPaths(), old.getStartOffsets(), 094 old.getLengths(), old.getLocations()); 095 } 096 097 public long getLength() { 098 return totLength; 099 } 100 101 /** Returns an array containing the start offsets of the files in the split*/ 102 public long[] getStartOffsets() { 103 return startoffset; 104 } 105 106 /** Returns an array containing the lengths of the files in the split*/ 107 public long[] getLengths() { 108 return lengths; 109 } 110 111 /** Returns the start offset of the i<sup>th</sup> Path */ 112 public long getOffset(int i) { 113 return startoffset[i]; 114 } 115 116 /** Returns the length of the i<sup>th</sup> Path */ 117 public long getLength(int i) { 118 return lengths[i]; 119 } 120 121 /** Returns the number of Paths in the split */ 122 public int getNumPaths() { 123 return paths.length; 124 } 125 126 /** Returns the i<sup>th</sup> Path */ 127 public Path getPath(int i) { 128 return paths[i]; 129 } 130 131 /** Returns all the Paths in the split */ 132 public Path[] getPaths() { 133 return paths; 134 } 135 136 /** Returns all the Paths where this input-split resides */ 137 public String[] getLocations() throws IOException { 138 return locations; 139 } 140 141 public void readFields(DataInput in) throws IOException { 142 totLength = in.readLong(); 143 int arrLength = in.readInt(); 144 lengths = new long[arrLength]; 145 for(int i=0; i<arrLength;i++) { 146 lengths[i] = in.readLong(); 147 } 148 int filesLength = in.readInt(); 149 paths = new Path[filesLength]; 150 for(int i=0; i<filesLength;i++) { 151 paths[i] = new Path(Text.readString(in)); 152 } 153 arrLength = in.readInt(); 154 startoffset = new long[arrLength]; 155 for(int i=0; i<arrLength;i++) { 156 startoffset[i] = in.readLong(); 157 } 158 } 159 160 public void write(DataOutput out) throws IOException { 161 out.writeLong(totLength); 162 out.writeInt(lengths.length); 163 for(long length : lengths) { 164 out.writeLong(length); 165 } 166 out.writeInt(paths.length); 167 for(Path p : paths) { 168 Text.writeString(out, p.toString()); 169 } 170 out.writeInt(startoffset.length); 171 for(long length : startoffset) { 172 out.writeLong(length); 173 } 174 } 175 176 @Override 177 public String toString() { 178 StringBuffer sb = new StringBuffer(); 179 for (int i = 0; i < paths.length; i++) { 180 if (i == 0 ) { 181 sb.append("Paths:"); 182 } 183 sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] + 184 "+" + lengths[i]); 185 if (i < paths.length -1) { 186 sb.append(","); 187 } 188 } 189 if (locations != null) { 190 String locs = ""; 191 StringBuffer locsb = new StringBuffer(); 192 for (int i = 0; i < locations.length; i++) { 193 locsb.append(locations[i] + ":"); 194 } 195 locs = locsb.toString(); 196 sb.append(" Locations:" + locs + "; "); 197 } 198 return sb.toString(); 199 } 200 }