001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.reduce;
020
021 import java.io.IOException;
022 import java.net.URI;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.conf.Configuration.IntegerRanges;
028 import org.apache.hadoop.fs.Path;
029 import org.apache.hadoop.io.RawComparator;
030 import org.apache.hadoop.mapreduce.Counter;
031 import org.apache.hadoop.mapreduce.InputFormat;
032 import org.apache.hadoop.mapreduce.JobID;
033 import org.apache.hadoop.mapreduce.Mapper;
034 import org.apache.hadoop.mapreduce.OutputCommitter;
035 import org.apache.hadoop.mapreduce.OutputFormat;
036 import org.apache.hadoop.mapreduce.Partitioner;
037 import org.apache.hadoop.mapreduce.ReduceContext;
038 import org.apache.hadoop.mapreduce.Reducer;
039 import org.apache.hadoop.mapreduce.TaskAttemptID;
040 import org.apache.hadoop.security.Credentials;
041
042 /**
043 * A {@link Reducer} which wraps a given one to allow for custom
044 * {@link Reducer.Context} implementations.
045 */
046 @InterfaceAudience.Public
047 @InterfaceStability.Evolving
048 public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
049 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
050
051 /**
052 * A a wrapped {@link Reducer.Context} for custom implementations.
053 * @param reduceContext <code>ReduceContext</code> to be wrapped
054 * @return a wrapped <code>Reducer.Context</code> for custom implementations
055 */
056 public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
057 getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) {
058 return new Context(reduceContext);
059 }
060
061 @InterfaceStability.Evolving
062 public class Context
063 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
064
065 protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext;
066
067 public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
068 {
069 this.reduceContext = reduceContext;
070 }
071
072 @Override
073 public KEYIN getCurrentKey() throws IOException, InterruptedException {
074 return reduceContext.getCurrentKey();
075 }
076
077 @Override
078 public VALUEIN getCurrentValue() throws IOException, InterruptedException {
079 return reduceContext.getCurrentValue();
080 }
081
082 @Override
083 public boolean nextKeyValue() throws IOException, InterruptedException {
084 return reduceContext.nextKeyValue();
085 }
086
087 @Override
088 public Counter getCounter(Enum counterName) {
089 return reduceContext.getCounter(counterName);
090 }
091
092 @Override
093 public Counter getCounter(String groupName, String counterName) {
094 return reduceContext.getCounter(groupName, counterName);
095 }
096
097 @Override
098 public OutputCommitter getOutputCommitter() {
099 return reduceContext.getOutputCommitter();
100 }
101
102 @Override
103 public void write(KEYOUT key, VALUEOUT value) throws IOException,
104 InterruptedException {
105 reduceContext.write(key, value);
106 }
107
108 @Override
109 public String getStatus() {
110 return reduceContext.getStatus();
111 }
112
113 @Override
114 public TaskAttemptID getTaskAttemptID() {
115 return reduceContext.getTaskAttemptID();
116 }
117
118 @Override
119 public void setStatus(String msg) {
120 reduceContext.setStatus(msg);
121 }
122
123 @Override
124 public Path[] getArchiveClassPaths() {
125 return reduceContext.getArchiveClassPaths();
126 }
127
128 @Override
129 public String[] getArchiveTimestamps() {
130 return reduceContext.getArchiveTimestamps();
131 }
132
133 @Override
134 public URI[] getCacheArchives() throws IOException {
135 return reduceContext.getCacheArchives();
136 }
137
138 @Override
139 public URI[] getCacheFiles() throws IOException {
140 return reduceContext.getCacheFiles();
141 }
142
143 @Override
144 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
145 throws ClassNotFoundException {
146 return reduceContext.getCombinerClass();
147 }
148
149 @Override
150 public Configuration getConfiguration() {
151 return reduceContext.getConfiguration();
152 }
153
154 @Override
155 public Path[] getFileClassPaths() {
156 return reduceContext.getFileClassPaths();
157 }
158
159 @Override
160 public String[] getFileTimestamps() {
161 return reduceContext.getFileTimestamps();
162 }
163
164 @Override
165 public RawComparator<?> getCombinerKeyGroupingComparator() {
166 return reduceContext.getCombinerKeyGroupingComparator();
167 }
168
169 @Override
170 public RawComparator<?> getGroupingComparator() {
171 return reduceContext.getGroupingComparator();
172 }
173
174 @Override
175 public Class<? extends InputFormat<?, ?>> getInputFormatClass()
176 throws ClassNotFoundException {
177 return reduceContext.getInputFormatClass();
178 }
179
180 @Override
181 public String getJar() {
182 return reduceContext.getJar();
183 }
184
185 @Override
186 public JobID getJobID() {
187 return reduceContext.getJobID();
188 }
189
190 @Override
191 public String getJobName() {
192 return reduceContext.getJobName();
193 }
194
195 @Override
196 public boolean getJobSetupCleanupNeeded() {
197 return reduceContext.getJobSetupCleanupNeeded();
198 }
199
200 @Override
201 public boolean getTaskCleanupNeeded() {
202 return reduceContext.getTaskCleanupNeeded();
203 }
204
205 @Override
206 public Path[] getLocalCacheArchives() throws IOException {
207 return reduceContext.getLocalCacheArchives();
208 }
209
210 @Override
211 public Path[] getLocalCacheFiles() throws IOException {
212 return reduceContext.getLocalCacheFiles();
213 }
214
215 @Override
216 public Class<?> getMapOutputKeyClass() {
217 return reduceContext.getMapOutputKeyClass();
218 }
219
220 @Override
221 public Class<?> getMapOutputValueClass() {
222 return reduceContext.getMapOutputValueClass();
223 }
224
225 @Override
226 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
227 throws ClassNotFoundException {
228 return reduceContext.getMapperClass();
229 }
230
231 @Override
232 public int getMaxMapAttempts() {
233 return reduceContext.getMaxMapAttempts();
234 }
235
236 @Override
237 public int getMaxReduceAttempts() {
238 return reduceContext.getMaxReduceAttempts();
239 }
240
241 @Override
242 public int getNumReduceTasks() {
243 return reduceContext.getNumReduceTasks();
244 }
245
246 @Override
247 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
248 throws ClassNotFoundException {
249 return reduceContext.getOutputFormatClass();
250 }
251
252 @Override
253 public Class<?> getOutputKeyClass() {
254 return reduceContext.getOutputKeyClass();
255 }
256
257 @Override
258 public Class<?> getOutputValueClass() {
259 return reduceContext.getOutputValueClass();
260 }
261
262 @Override
263 public Class<? extends Partitioner<?, ?>> getPartitionerClass()
264 throws ClassNotFoundException {
265 return reduceContext.getPartitionerClass();
266 }
267
268 @Override
269 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
270 throws ClassNotFoundException {
271 return reduceContext.getReducerClass();
272 }
273
274 @Override
275 public RawComparator<?> getSortComparator() {
276 return reduceContext.getSortComparator();
277 }
278
279 @Override
280 public boolean getSymlink() {
281 return reduceContext.getSymlink();
282 }
283
284 @Override
285 public Path getWorkingDirectory() throws IOException {
286 return reduceContext.getWorkingDirectory();
287 }
288
289 @Override
290 public void progress() {
291 reduceContext.progress();
292 }
293
294 @Override
295 public Iterable<VALUEIN> getValues() throws IOException,
296 InterruptedException {
297 return reduceContext.getValues();
298 }
299
300 @Override
301 public boolean nextKey() throws IOException, InterruptedException {
302 return reduceContext.nextKey();
303 }
304
305 @Override
306 public boolean getProfileEnabled() {
307 return reduceContext.getProfileEnabled();
308 }
309
310 @Override
311 public String getProfileParams() {
312 return reduceContext.getProfileParams();
313 }
314
315 @Override
316 public IntegerRanges getProfileTaskRange(boolean isMap) {
317 return reduceContext.getProfileTaskRange(isMap);
318 }
319
320 @Override
321 public String getUser() {
322 return reduceContext.getUser();
323 }
324
325 @Override
326 public Credentials getCredentials() {
327 return reduceContext.getCredentials();
328 }
329
330 @Override
331 public float getProgress() {
332 return reduceContext.getProgress();
333 }
334 }
335 }