001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.input;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.List;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.fs.FileStatus;
031 import org.apache.hadoop.fs.FileSystem;
032 import org.apache.hadoop.fs.Path;
033 import org.apache.hadoop.fs.PathFilter;
034 import org.apache.hadoop.fs.BlockLocation;
035 import org.apache.hadoop.mapreduce.InputFormat;
036 import org.apache.hadoop.mapreduce.InputSplit;
037 import org.apache.hadoop.mapreduce.Job;
038 import org.apache.hadoop.mapreduce.JobContext;
039 import org.apache.hadoop.mapreduce.Mapper;
040 import org.apache.hadoop.mapreduce.security.TokenCache;
041 import org.apache.hadoop.util.ReflectionUtils;
042 import org.apache.hadoop.util.StringUtils;
043
044 /**
045 * A base class for file-based {@link InputFormat}s.
046 *
047 * <p><code>FileInputFormat</code> is the base class for all file-based
048 * <code>InputFormat</code>s. This provides a generic implementation of
049 * {@link #getSplits(JobContext)}.
050 * Subclasses of <code>FileInputFormat</code> can also override the
051 * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
052 * not split-up and are processed as a whole by {@link Mapper}s.
053 */
054 @InterfaceAudience.Public
055 @InterfaceStability.Stable
056 public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
057 public static final String INPUT_DIR =
058 "mapreduce.input.fileinputformat.inputdir";
059 public static final String SPLIT_MAXSIZE =
060 "mapreduce.input.fileinputformat.split.maxsize";
061 public static final String SPLIT_MINSIZE =
062 "mapreduce.input.fileinputformat.split.minsize";
063 public static final String PATHFILTER_CLASS =
064 "mapreduce.input.pathFilter.class";
065 public static final String NUM_INPUT_FILES =
066 "mapreduce.input.fileinputformat.numinputfiles";
067
068 private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
069
070 private static final double SPLIT_SLOP = 1.1; // 10% slop
071
072 private static final PathFilter hiddenFileFilter = new PathFilter(){
073 public boolean accept(Path p){
074 String name = p.getName();
075 return !name.startsWith("_") && !name.startsWith(".");
076 }
077 };
078
079 /**
080 * Proxy PathFilter that accepts a path only if all filters given in the
081 * constructor do. Used by the listPaths() to apply the built-in
082 * hiddenFileFilter together with a user provided one (if any).
083 */
084 private static class MultiPathFilter implements PathFilter {
085 private List<PathFilter> filters;
086
087 public MultiPathFilter(List<PathFilter> filters) {
088 this.filters = filters;
089 }
090
091 public boolean accept(Path path) {
092 for (PathFilter filter : filters) {
093 if (!filter.accept(path)) {
094 return false;
095 }
096 }
097 return true;
098 }
099 }
100
101 /**
102 * Get the lower bound on split size imposed by the format.
103 * @return the number of bytes of the minimal split for this format
104 */
105 protected long getFormatMinSplitSize() {
106 return 1;
107 }
108
109 /**
110 * Is the given filename splitable? Usually, true, but if the file is
111 * stream compressed, it will not be.
112 *
113 * <code>FileInputFormat</code> implementations can override this and return
114 * <code>false</code> to ensure that individual input files are never split-up
115 * so that {@link Mapper}s process entire files.
116 *
117 * @param context the job context
118 * @param filename the file name to check
119 * @return is this file splitable?
120 */
121 protected boolean isSplitable(JobContext context, Path filename) {
122 return true;
123 }
124
125 /**
126 * Set a PathFilter to be applied to the input paths for the map-reduce job.
127 * @param job the job to modify
128 * @param filter the PathFilter class use for filtering the input paths.
129 */
130 public static void setInputPathFilter(Job job,
131 Class<? extends PathFilter> filter) {
132 job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
133 PathFilter.class);
134 }
135
136 /**
137 * Set the minimum input split size
138 * @param job the job to modify
139 * @param size the minimum size
140 */
141 public static void setMinInputSplitSize(Job job,
142 long size) {
143 job.getConfiguration().setLong(SPLIT_MINSIZE, size);
144 }
145
146 /**
147 * Get the minimum split size
148 * @param job the job
149 * @return the minimum number of bytes that can be in a split
150 */
151 public static long getMinSplitSize(JobContext job) {
152 return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
153 }
154
155 /**
156 * Set the maximum split size
157 * @param job the job to modify
158 * @param size the maximum split size
159 */
160 public static void setMaxInputSplitSize(Job job,
161 long size) {
162 job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
163 }
164
165 /**
166 * Get the maximum split size.
167 * @param context the job to look at.
168 * @return the maximum number of bytes a split can include
169 */
170 public static long getMaxSplitSize(JobContext context) {
171 return context.getConfiguration().getLong(SPLIT_MAXSIZE,
172 Long.MAX_VALUE);
173 }
174
175 /**
176 * Get a PathFilter instance of the filter set for the input paths.
177 *
178 * @return the PathFilter instance set for the job, NULL if none has been set.
179 */
180 public static PathFilter getInputPathFilter(JobContext context) {
181 Configuration conf = context.getConfiguration();
182 Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
183 PathFilter.class);
184 return (filterClass != null) ?
185 (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
186 }
187
188 /** List input directories.
189 * Subclasses may override to, e.g., select only files matching a regular
190 * expression.
191 *
192 * @param job the job to list input paths for
193 * @return array of FileStatus objects
194 * @throws IOException if zero items.
195 */
196 protected List<FileStatus> listStatus(JobContext job
197 ) throws IOException {
198 List<FileStatus> result = new ArrayList<FileStatus>();
199 Path[] dirs = getInputPaths(job);
200 if (dirs.length == 0) {
201 throw new IOException("No input paths specified in job");
202 }
203
204 // get tokens for all the required FileSystems..
205 TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
206 job.getConfiguration());
207
208 List<IOException> errors = new ArrayList<IOException>();
209
210 // creates a MultiPathFilter with the hiddenFileFilter and the
211 // user provided one (if any).
212 List<PathFilter> filters = new ArrayList<PathFilter>();
213 filters.add(hiddenFileFilter);
214 PathFilter jobFilter = getInputPathFilter(job);
215 if (jobFilter != null) {
216 filters.add(jobFilter);
217 }
218 PathFilter inputFilter = new MultiPathFilter(filters);
219
220 for (int i=0; i < dirs.length; ++i) {
221 Path p = dirs[i];
222 FileSystem fs = p.getFileSystem(job.getConfiguration());
223 FileStatus[] matches = fs.globStatus(p, inputFilter);
224 if (matches == null) {
225 errors.add(new IOException("Input path does not exist: " + p));
226 } else if (matches.length == 0) {
227 errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
228 } else {
229 for (FileStatus globStat: matches) {
230 if (globStat.isDirectory()) {
231 for(FileStatus stat: fs.listStatus(globStat.getPath(),
232 inputFilter)) {
233 result.add(stat);
234 }
235 } else {
236 result.add(globStat);
237 }
238 }
239 }
240 }
241
242 if (!errors.isEmpty()) {
243 throw new InvalidInputException(errors);
244 }
245 LOG.info("Total input paths to process : " + result.size());
246 return result;
247 }
248
249 /**
250 * A factory that makes the split for this class. It can be overridden
251 * by sub-classes to make sub-types
252 */
253 protected FileSplit makeSplit(Path file, long start, long length,
254 String[] hosts) {
255 return new FileSplit(file, start, length, hosts);
256 }
257
258 /**
259 * Generate the list of files and make them into FileSplits.
260 * @param job the job context
261 * @throws IOException
262 */
263 public List<InputSplit> getSplits(JobContext job) throws IOException {
264 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
265 long maxSize = getMaxSplitSize(job);
266
267 // generate splits
268 List<InputSplit> splits = new ArrayList<InputSplit>();
269 List<FileStatus> files = listStatus(job);
270 for (FileStatus file: files) {
271 Path path = file.getPath();
272 long length = file.getLen();
273 if (length != 0) {
274 FileSystem fs = path.getFileSystem(job.getConfiguration());
275 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
276 if (isSplitable(job, path)) {
277 long blockSize = file.getBlockSize();
278 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
279
280 long bytesRemaining = length;
281 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
282 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
283 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
284 blkLocations[blkIndex].getHosts()));
285 bytesRemaining -= splitSize;
286 }
287
288 if (bytesRemaining != 0) {
289 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
290 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
291 blkLocations[blkIndex].getHosts()));
292 }
293 } else { // not splitable
294 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
295 }
296 } else {
297 //Create empty hosts array for zero length files
298 splits.add(makeSplit(path, 0, length, new String[0]));
299 }
300 }
301 // Save the number of input files for metrics/loadgen
302 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
303 LOG.debug("Total # of splits: " + splits.size());
304 return splits;
305 }
306
307 protected long computeSplitSize(long blockSize, long minSize,
308 long maxSize) {
309 return Math.max(minSize, Math.min(maxSize, blockSize));
310 }
311
312 protected int getBlockIndex(BlockLocation[] blkLocations,
313 long offset) {
314 for (int i = 0 ; i < blkLocations.length; i++) {
315 // is the offset inside this block?
316 if ((blkLocations[i].getOffset() <= offset) &&
317 (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
318 return i;
319 }
320 }
321 BlockLocation last = blkLocations[blkLocations.length -1];
322 long fileLength = last.getOffset() + last.getLength() -1;
323 throw new IllegalArgumentException("Offset " + offset +
324 " is outside of file (0.." +
325 fileLength + ")");
326 }
327
328 /**
329 * Sets the given comma separated paths as the list of inputs
330 * for the map-reduce job.
331 *
332 * @param job the job
333 * @param commaSeparatedPaths Comma separated paths to be set as
334 * the list of inputs for the map-reduce job.
335 */
336 public static void setInputPaths(Job job,
337 String commaSeparatedPaths
338 ) throws IOException {
339 setInputPaths(job, StringUtils.stringToPath(
340 getPathStrings(commaSeparatedPaths)));
341 }
342
343 /**
344 * Add the given comma separated paths to the list of inputs for
345 * the map-reduce job.
346 *
347 * @param job The job to modify
348 * @param commaSeparatedPaths Comma separated paths to be added to
349 * the list of inputs for the map-reduce job.
350 */
351 public static void addInputPaths(Job job,
352 String commaSeparatedPaths
353 ) throws IOException {
354 for (String str : getPathStrings(commaSeparatedPaths)) {
355 addInputPath(job, new Path(str));
356 }
357 }
358
359 /**
360 * Set the array of {@link Path}s as the list of inputs
361 * for the map-reduce job.
362 *
363 * @param job The job to modify
364 * @param inputPaths the {@link Path}s of the input directories/files
365 * for the map-reduce job.
366 */
367 public static void setInputPaths(Job job,
368 Path... inputPaths) throws IOException {
369 Configuration conf = job.getConfiguration();
370 Path path = inputPaths[0].getFileSystem(conf).makeQualified(inputPaths[0]);
371 StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
372 for(int i = 1; i < inputPaths.length;i++) {
373 str.append(StringUtils.COMMA_STR);
374 path = inputPaths[i].getFileSystem(conf).makeQualified(inputPaths[i]);
375 str.append(StringUtils.escapeString(path.toString()));
376 }
377 conf.set(INPUT_DIR, str.toString());
378 }
379
380 /**
381 * Add a {@link Path} to the list of inputs for the map-reduce job.
382 *
383 * @param job The {@link Job} to modify
384 * @param path {@link Path} to be added to the list of inputs for
385 * the map-reduce job.
386 */
387 public static void addInputPath(Job job,
388 Path path) throws IOException {
389 Configuration conf = job.getConfiguration();
390 path = path.getFileSystem(conf).makeQualified(path);
391 String dirStr = StringUtils.escapeString(path.toString());
392 String dirs = conf.get(INPUT_DIR);
393 conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," + dirStr);
394 }
395
396 // This method escapes commas in the glob pattern of the given paths.
397 private static String[] getPathStrings(String commaSeparatedPaths) {
398 int length = commaSeparatedPaths.length();
399 int curlyOpen = 0;
400 int pathStart = 0;
401 boolean globPattern = false;
402 List<String> pathStrings = new ArrayList<String>();
403
404 for (int i=0; i<length; i++) {
405 char ch = commaSeparatedPaths.charAt(i);
406 switch(ch) {
407 case '{' : {
408 curlyOpen++;
409 if (!globPattern) {
410 globPattern = true;
411 }
412 break;
413 }
414 case '}' : {
415 curlyOpen--;
416 if (curlyOpen == 0 && globPattern) {
417 globPattern = false;
418 }
419 break;
420 }
421 case ',' : {
422 if (!globPattern) {
423 pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
424 pathStart = i + 1 ;
425 }
426 break;
427 }
428 default:
429 continue; // nothing special to do for this character
430 }
431 }
432 pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
433
434 return pathStrings.toArray(new String[0]);
435 }
436
437 /**
438 * Get the list of input {@link Path}s for the map-reduce job.
439 *
440 * @param context The job
441 * @return the list of input {@link Path}s for the map-reduce job.
442 */
443 public static Path[] getInputPaths(JobContext context) {
444 String dirs = context.getConfiguration().get(INPUT_DIR, "");
445 String [] list = StringUtils.split(dirs);
446 Path[] result = new Path[list.length];
447 for (int i = 0; i < list.length; i++) {
448 result[i] = new Path(StringUtils.unEscapeString(list[i]));
449 }
450 return result;
451 }
452
453 }