001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.IOException; 022 import java.util.List; 023 024 import org.apache.hadoop.classification.InterfaceAudience; 025 import org.apache.hadoop.classification.InterfaceStability; 026 import org.apache.hadoop.fs.FileStatus; 027 import org.apache.hadoop.fs.FileSystem; 028 import org.apache.hadoop.fs.Path; 029 import org.apache.hadoop.fs.PathFilter; 030 import org.apache.hadoop.io.compress.CompressionCodec; 031 import org.apache.hadoop.io.compress.CompressionCodecFactory; 032 import org.apache.hadoop.io.compress.SplittableCompressionCodec; 033 import org.apache.hadoop.mapred.InputFormat; 034 import org.apache.hadoop.mapred.InputSplit; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapred.RecordReader; 037 import org.apache.hadoop.mapred.Reporter; 038 import org.apache.hadoop.mapreduce.Job; 039 import org.apache.hadoop.mapreduce.JobContext; 040 import org.apache.hadoop.mapreduce.TaskAttemptContext; 041 042 /** 043 * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s 044 * in {@link org.apache.hadoop.mapred.InputFormat#getSplits(JobConf, int)} method. 045 * Splits are constructed from the files under the input paths. 046 * A split cannot have files from different pools. 047 * Each split returned may contain blocks from different files. 048 * If a maxSplitSize is specified, then blocks on the same node are 049 * combined to form a single split. Blocks that are left over are 050 * then combined with other blocks in the same rack. 051 * If maxSplitSize is not specified, then blocks from the same rack 052 * are combined in a single split; no attempt is made to create 053 * node-local splits. 054 * If the maxSplitSize is equal to the block size, then this class 055 * is similar to the default spliting behaviour in Hadoop: each 056 * block is a locally processed split. 057 * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)} 058 * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s. 059 * @see CombineFileSplit 060 */ 061 @InterfaceAudience.Public 062 @InterfaceStability.Stable 063 public abstract class CombineFileInputFormat<K, V> 064 extends org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat<K, V> 065 implements InputFormat<K, V>{ 066 067 /** 068 * default constructor 069 */ 070 public CombineFileInputFormat() { 071 } 072 073 public InputSplit[] getSplits(JobConf job, int numSplits) 074 throws IOException { 075 List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits = 076 super.getSplits(new Job(job)); 077 InputSplit[] ret = new InputSplit[newStyleSplits.size()]; 078 for(int pos = 0; pos < newStyleSplits.size(); ++pos) { 079 org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = 080 (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos); 081 ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(), 082 newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(), 083 newStyleSplit.getLocations()); 084 } 085 return ret; 086 } 087 088 /** 089 * Create a new pool and add the filters to it. 090 * A split cannot have files from different pools. 091 * @deprecated Use {@link #createPool(List)}. 092 */ 093 @Deprecated 094 protected void createPool(JobConf conf, List<PathFilter> filters) { 095 createPool(filters); 096 } 097 098 /** 099 * Create a new pool and add the filters to it. 100 * A pathname can satisfy any one of the specified filters. 101 * A split cannot have files from different pools. 102 * @deprecated Use {@link #createPool(PathFilter...)}. 103 */ 104 @Deprecated 105 protected void createPool(JobConf conf, PathFilter... filters) { 106 createPool(filters); 107 } 108 109 /** 110 * This is not implemented yet. 111 */ 112 public abstract RecordReader<K, V> getRecordReader(InputSplit split, 113 JobConf job, Reporter reporter) 114 throws IOException; 115 116 // abstract method from super class implemented to return null 117 public org.apache.hadoop.mapreduce.RecordReader<K, V> createRecordReader( 118 org.apache.hadoop.mapreduce.InputSplit split, 119 TaskAttemptContext context) throws IOException { 120 return null; 121 } 122 123 /** List input directories. 124 * Subclasses may override to, e.g., select only files matching a regular 125 * expression. 126 * 127 * @param job the job to list input paths for 128 * @return array of FileStatus objects 129 * @throws IOException if zero items. 130 */ 131 protected FileStatus[] listStatus(JobConf job) throws IOException { 132 List<FileStatus> result = super.listStatus(new Job(job)); 133 return result.toArray(new FileStatus[result.size()]); 134 } 135 136 /** 137 * Subclasses should avoid overriding this method and should instead only 138 * override {@link #isSplitable(FileSystem, Path)}. The implementation of 139 * this method simply calls the other method to preserve compatibility. 140 * @see <a href="https://issues.apache.org/jira/browse/MAPREDUCE-5530"> 141 * MAPREDUCE-5530</a> 142 * 143 * @param context the job context 144 * @param file the file name to check 145 * @return is this file splitable? 146 */ 147 @InterfaceAudience.Private 148 @Override 149 protected boolean isSplitable(JobContext context, Path file) { 150 try { 151 return isSplitable(FileSystem.get(context.getConfiguration()), file); 152 } 153 catch (IOException ioe) { 154 throw new RuntimeException(ioe); 155 } 156 } 157 158 protected boolean isSplitable(FileSystem fs, Path file) { 159 final CompressionCodec codec = 160 new CompressionCodecFactory(fs.getConf()).getCodec(file); 161 if (null == codec) { 162 return true; 163 } 164 return codec instanceof SplittableCompressionCodec; 165 } 166 }