001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.map;
020
021 import java.io.IOException;
022 import java.net.URI;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.conf.Configuration.IntegerRanges;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.io.RawComparator;
030 import org.apache.hadoop.mapreduce.Counter;
031 import org.apache.hadoop.mapreduce.InputFormat;
032 import org.apache.hadoop.mapreduce.InputSplit;
033 import org.apache.hadoop.mapreduce.JobID;
034 import org.apache.hadoop.mapreduce.MapContext;
035 import org.apache.hadoop.mapreduce.Mapper;
036 import org.apache.hadoop.mapreduce.OutputCommitter;
037 import org.apache.hadoop.mapreduce.OutputFormat;
038 import org.apache.hadoop.mapreduce.Partitioner;
039 import org.apache.hadoop.mapreduce.Reducer;
040 import org.apache.hadoop.mapreduce.TaskAttemptID;
041 import org.apache.hadoop.security.Credentials;
042
043 /**
044 * A {@link Mapper} which wraps a given one to allow custom
045 * {@link Mapper.Context} implementations.
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Evolving
049 public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
050 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
051
052 /**
053 * Get a wrapped {@link Mapper.Context} for custom implementations.
054 * @param mapContext <code>MapContext</code> to be wrapped
055 * @return a wrapped <code>Mapper.Context</code> for custom implementations
056 */
057 public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
058 getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
059 return new Context(mapContext);
060 }
061
062 @InterfaceStability.Evolving
063 public class Context
064 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
065
066 protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
067
068 public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
069 this.mapContext = mapContext;
070 }
071
072 /**
073 * Get the input split for this map.
074 */
075 public InputSplit getInputSplit() {
076 return mapContext.getInputSplit();
077 }
078
079 @Override
080 public KEYIN getCurrentKey() throws IOException, InterruptedException {
081 return mapContext.getCurrentKey();
082 }
083
084 @Override
085 public VALUEIN getCurrentValue() throws IOException, InterruptedException {
086 return mapContext.getCurrentValue();
087 }
088
089 @Override
090 public boolean nextKeyValue() throws IOException, InterruptedException {
091 return mapContext.nextKeyValue();
092 }
093
094 @Override
095 public Counter getCounter(Enum<?> counterName) {
096 return mapContext.getCounter(counterName);
097 }
098
099 @Override
100 public Counter getCounter(String groupName, String counterName) {
101 return mapContext.getCounter(groupName, counterName);
102 }
103
104 @Override
105 public OutputCommitter getOutputCommitter() {
106 return mapContext.getOutputCommitter();
107 }
108
109 @Override
110 public void write(KEYOUT key, VALUEOUT value) throws IOException,
111 InterruptedException {
112 mapContext.write(key, value);
113 }
114
115 @Override
116 public String getStatus() {
117 return mapContext.getStatus();
118 }
119
120 @Override
121 public TaskAttemptID getTaskAttemptID() {
122 return mapContext.getTaskAttemptID();
123 }
124
125 @Override
126 public void setStatus(String msg) {
127 mapContext.setStatus(msg);
128 }
129
130 @Override
131 public Path[] getArchiveClassPaths() {
132 return mapContext.getArchiveClassPaths();
133 }
134
135 @Override
136 public String[] getArchiveTimestamps() {
137 return mapContext.getArchiveTimestamps();
138 }
139
140 @Override
141 public URI[] getCacheArchives() throws IOException {
142 return mapContext.getCacheArchives();
143 }
144
145 @Override
146 public URI[] getCacheFiles() throws IOException {
147 return mapContext.getCacheFiles();
148 }
149
150 @Override
151 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
152 throws ClassNotFoundException {
153 return mapContext.getCombinerClass();
154 }
155
156 @Override
157 public Configuration getConfiguration() {
158 return mapContext.getConfiguration();
159 }
160
161 @Override
162 public Path[] getFileClassPaths() {
163 return mapContext.getFileClassPaths();
164 }
165
166 @Override
167 public String[] getFileTimestamps() {
168 return mapContext.getFileTimestamps();
169 }
170
171 @Override
172 public RawComparator<?> getCombinerKeyGroupingComparator() {
173 return mapContext.getCombinerKeyGroupingComparator();
174 }
175
176 @Override
177 public RawComparator<?> getGroupingComparator() {
178 return mapContext.getGroupingComparator();
179 }
180
181 @Override
182 public Class<? extends InputFormat<?, ?>> getInputFormatClass()
183 throws ClassNotFoundException {
184 return mapContext.getInputFormatClass();
185 }
186
187 @Override
188 public String getJar() {
189 return mapContext.getJar();
190 }
191
192 @Override
193 public JobID getJobID() {
194 return mapContext.getJobID();
195 }
196
197 @Override
198 public String getJobName() {
199 return mapContext.getJobName();
200 }
201
202 @Override
203 public boolean getJobSetupCleanupNeeded() {
204 return mapContext.getJobSetupCleanupNeeded();
205 }
206
207 @Override
208 public boolean getTaskCleanupNeeded() {
209 return mapContext.getTaskCleanupNeeded();
210 }
211
212 @Override
213 public Path[] getLocalCacheArchives() throws IOException {
214 return mapContext.getLocalCacheArchives();
215 }
216
217 @Override
218 public Path[] getLocalCacheFiles() throws IOException {
219 return mapContext.getLocalCacheFiles();
220 }
221
222 @Override
223 public Class<?> getMapOutputKeyClass() {
224 return mapContext.getMapOutputKeyClass();
225 }
226
227 @Override
228 public Class<?> getMapOutputValueClass() {
229 return mapContext.getMapOutputValueClass();
230 }
231
232 @Override
233 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
234 throws ClassNotFoundException {
235 return mapContext.getMapperClass();
236 }
237
238 @Override
239 public int getMaxMapAttempts() {
240 return mapContext.getMaxMapAttempts();
241 }
242
243 @Override
244 public int getMaxReduceAttempts() {
245 return mapContext.getMaxReduceAttempts();
246 }
247
248 @Override
249 public int getNumReduceTasks() {
250 return mapContext.getNumReduceTasks();
251 }
252
253 @Override
254 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
255 throws ClassNotFoundException {
256 return mapContext.getOutputFormatClass();
257 }
258
259 @Override
260 public Class<?> getOutputKeyClass() {
261 return mapContext.getOutputKeyClass();
262 }
263
264 @Override
265 public Class<?> getOutputValueClass() {
266 return mapContext.getOutputValueClass();
267 }
268
269 @Override
270 public Class<? extends Partitioner<?, ?>> getPartitionerClass()
271 throws ClassNotFoundException {
272 return mapContext.getPartitionerClass();
273 }
274
275 @Override
276 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
277 throws ClassNotFoundException {
278 return mapContext.getReducerClass();
279 }
280
281 @Override
282 public RawComparator<?> getSortComparator() {
283 return mapContext.getSortComparator();
284 }
285
286 @Override
287 public boolean getSymlink() {
288 return mapContext.getSymlink();
289 }
290
291 @Override
292 public Path getWorkingDirectory() throws IOException {
293 return mapContext.getWorkingDirectory();
294 }
295
296 @Override
297 public void progress() {
298 mapContext.progress();
299 }
300
301 @Override
302 public boolean getProfileEnabled() {
303 return mapContext.getProfileEnabled();
304 }
305
306 @Override
307 public String getProfileParams() {
308 return mapContext.getProfileParams();
309 }
310
311 @Override
312 public IntegerRanges getProfileTaskRange(boolean isMap) {
313 return mapContext.getProfileTaskRange(isMap);
314 }
315
316 @Override
317 public String getUser() {
318 return mapContext.getUser();
319 }
320
321 @Override
322 public Credentials getCredentials() {
323 return mapContext.getCredentials();
324 }
325
326 @Override
327 public float getProgress() {
328 return mapContext.getProgress();
329 }
330 }
331 }