001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.reduce;
020
021import java.io.IOException;
022import java.net.URI;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.conf.Configuration.IntegerRanges;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.io.RawComparator;
030import org.apache.hadoop.mapreduce.Counter;
031import org.apache.hadoop.mapreduce.InputFormat;
032import org.apache.hadoop.mapreduce.JobID;
033import org.apache.hadoop.mapreduce.Mapper;
034import org.apache.hadoop.mapreduce.OutputCommitter;
035import org.apache.hadoop.mapreduce.OutputFormat;
036import org.apache.hadoop.mapreduce.Partitioner;
037import org.apache.hadoop.mapreduce.ReduceContext;
038import org.apache.hadoop.mapreduce.Reducer;
039import org.apache.hadoop.mapreduce.TaskAttemptID;
040import org.apache.hadoop.security.Credentials;
041
042/**
043 * A {@link Reducer} which wraps a given one to allow for custom 
044 * {@link Reducer.Context} implementations.
045 */
046@InterfaceAudience.Public
047@InterfaceStability.Evolving
048public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
049    extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
050
051  /**
052   * A a wrapped {@link Reducer.Context} for custom implementations.
053   * @param reduceContext <code>ReduceContext</code> to be wrapped
054   * @return a wrapped <code>Reducer.Context</code> for custom implementations
055   */
056  public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 
057  getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) {
058    return new Context(reduceContext);
059  }
060  
061  @InterfaceStability.Evolving
062  public class Context 
063      extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
064
065    protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext;
066
067    public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
068    {
069      this.reduceContext = reduceContext; 
070    }
071
072    @Override
073    public KEYIN getCurrentKey() throws IOException, InterruptedException {
074      return reduceContext.getCurrentKey();
075    }
076
077    @Override
078    public VALUEIN getCurrentValue() throws IOException, InterruptedException {
079      return reduceContext.getCurrentValue();
080    }
081
082    @Override
083    public boolean nextKeyValue() throws IOException, InterruptedException {
084      return reduceContext.nextKeyValue();
085    }
086
087    @Override
088    public Counter getCounter(Enum counterName) {
089      return reduceContext.getCounter(counterName);
090    }
091
092    @Override
093    public Counter getCounter(String groupName, String counterName) {
094      return reduceContext.getCounter(groupName, counterName);
095    }
096
097    @Override
098    public OutputCommitter getOutputCommitter() {
099      return reduceContext.getOutputCommitter();
100    }
101
102    @Override
103    public void write(KEYOUT key, VALUEOUT value) throws IOException,
104        InterruptedException {
105      reduceContext.write(key, value);
106    }
107
108    @Override
109    public String getStatus() {
110      return reduceContext.getStatus();
111    }
112
113    @Override
114    public TaskAttemptID getTaskAttemptID() {
115      return reduceContext.getTaskAttemptID();
116    }
117
118    @Override
119    public void setStatus(String msg) {
120      reduceContext.setStatus(msg);
121    }
122
123    @Override
124    public Path[] getArchiveClassPaths() {
125      return reduceContext.getArchiveClassPaths();
126    }
127
128    @Override
129    public String[] getArchiveTimestamps() {
130      return reduceContext.getArchiveTimestamps();
131    }
132
133    @Override
134    public URI[] getCacheArchives() throws IOException {
135      return reduceContext.getCacheArchives();
136    }
137
138    @Override
139    public URI[] getCacheFiles() throws IOException {
140      return reduceContext.getCacheFiles();
141    }
142
143    @Override
144    public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
145        throws ClassNotFoundException {
146      return reduceContext.getCombinerClass();
147    }
148
149    @Override
150    public Configuration getConfiguration() {
151      return reduceContext.getConfiguration();
152    }
153
154    @Override
155    public Path[] getFileClassPaths() {
156      return reduceContext.getFileClassPaths();
157    }
158
159    @Override
160    public String[] getFileTimestamps() {
161      return reduceContext.getFileTimestamps();
162    }
163
164    @Override
165    public RawComparator<?> getCombinerKeyGroupingComparator() {
166      return reduceContext.getCombinerKeyGroupingComparator();
167    }
168
169    @Override
170    public RawComparator<?> getGroupingComparator() {
171      return reduceContext.getGroupingComparator();
172    }
173
174    @Override
175    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
176        throws ClassNotFoundException {
177      return reduceContext.getInputFormatClass();
178    }
179
180    @Override
181    public String getJar() {
182      return reduceContext.getJar();
183    }
184
185    @Override
186    public JobID getJobID() {
187      return reduceContext.getJobID();
188    }
189
190    @Override
191    public String getJobName() {
192      return reduceContext.getJobName();
193    }
194
195    @Override
196    public boolean getJobSetupCleanupNeeded() {
197      return reduceContext.getJobSetupCleanupNeeded();
198    }
199
200    @Override
201    public boolean getTaskCleanupNeeded() {
202      return reduceContext.getTaskCleanupNeeded();
203    }
204
205    @Override
206    public Path[] getLocalCacheArchives() throws IOException {
207      return reduceContext.getLocalCacheArchives();
208    }
209
210    @Override
211    public Path[] getLocalCacheFiles() throws IOException {
212      return reduceContext.getLocalCacheFiles();
213    }
214
215    @Override
216    public Class<?> getMapOutputKeyClass() {
217      return reduceContext.getMapOutputKeyClass();
218    }
219
220    @Override
221    public Class<?> getMapOutputValueClass() {
222      return reduceContext.getMapOutputValueClass();
223    }
224
225    @Override
226    public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
227        throws ClassNotFoundException {
228      return reduceContext.getMapperClass();
229    }
230
231    @Override
232    public int getMaxMapAttempts() {
233      return reduceContext.getMaxMapAttempts();
234    }
235
236    @Override
237    public int getMaxReduceAttempts() {
238      return reduceContext.getMaxReduceAttempts();
239    }
240
241    @Override
242    public int getNumReduceTasks() {
243      return reduceContext.getNumReduceTasks();
244    }
245
246    @Override
247    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
248        throws ClassNotFoundException {
249      return reduceContext.getOutputFormatClass();
250    }
251
252    @Override
253    public Class<?> getOutputKeyClass() {
254      return reduceContext.getOutputKeyClass();
255    }
256
257    @Override
258    public Class<?> getOutputValueClass() {
259      return reduceContext.getOutputValueClass();
260    }
261
262    @Override
263    public Class<? extends Partitioner<?, ?>> getPartitionerClass()
264        throws ClassNotFoundException {
265      return reduceContext.getPartitionerClass();
266    }
267
268    @Override
269    public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
270        throws ClassNotFoundException {
271      return reduceContext.getReducerClass();
272    }
273
274    @Override
275    public RawComparator<?> getSortComparator() {
276      return reduceContext.getSortComparator();
277    }
278
279    @Override
280    public boolean getSymlink() {
281      return reduceContext.getSymlink();
282    }
283
284    @Override
285    public Path getWorkingDirectory() throws IOException {
286      return reduceContext.getWorkingDirectory();
287    }
288
289    @Override
290    public void progress() {
291      reduceContext.progress();
292    }
293
294    @Override
295    public Iterable<VALUEIN> getValues() throws IOException,
296        InterruptedException {
297      return reduceContext.getValues();
298    }
299
300    @Override
301    public boolean nextKey() throws IOException, InterruptedException {
302      return reduceContext.nextKey();
303    }
304    
305    @Override
306    public boolean getProfileEnabled() {
307      return reduceContext.getProfileEnabled();
308    }
309
310    @Override
311    public String getProfileParams() {
312      return reduceContext.getProfileParams();
313    }
314
315    @Override
316    public IntegerRanges getProfileTaskRange(boolean isMap) {
317      return reduceContext.getProfileTaskRange(isMap);
318    }
319
320    @Override
321    public String getUser() {
322      return reduceContext.getUser();
323    }
324
325    @Override
326    public Credentials getCredentials() {
327      return reduceContext.getCredentials();
328    }
329    
330    @Override
331    public float getProgress() {
332      return reduceContext.getProgress();
333    }
334  }
335}