001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.reduce;
020    
021    import java.io.IOException;
022    import java.net.URI;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.conf.Configuration.IntegerRanges;
028    import org.apache.hadoop.fs.Path;
029    import org.apache.hadoop.io.RawComparator;
030    import org.apache.hadoop.mapreduce.Counter;
031    import org.apache.hadoop.mapreduce.InputFormat;
032    import org.apache.hadoop.mapreduce.JobID;
033    import org.apache.hadoop.mapreduce.Mapper;
034    import org.apache.hadoop.mapreduce.OutputCommitter;
035    import org.apache.hadoop.mapreduce.OutputFormat;
036    import org.apache.hadoop.mapreduce.Partitioner;
037    import org.apache.hadoop.mapreduce.ReduceContext;
038    import org.apache.hadoop.mapreduce.Reducer;
039    import org.apache.hadoop.mapreduce.TaskAttemptID;
040    import org.apache.hadoop.security.Credentials;
041    
042    /**
043     * A {@link Reducer} which wraps a given one to allow for custom 
044     * {@link Reducer.Context} implementations.
045     */
046    @InterfaceAudience.Public
047    @InterfaceStability.Evolving
048    public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
049        extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
050    
051      /**
052       * A a wrapped {@link Reducer.Context} for custom implementations.
053       * @param reduceContext <code>ReduceContext</code> to be wrapped
054       * @return a wrapped <code>Reducer.Context</code> for custom implementations
055       */
056      public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 
057      getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) {
058        return new Context(reduceContext);
059      }
060      
061      @InterfaceStability.Evolving
062      public class Context 
063          extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
064    
065        protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext;
066    
067        public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext)
068        {
069          this.reduceContext = reduceContext; 
070        }
071    
072        @Override
073        public KEYIN getCurrentKey() throws IOException, InterruptedException {
074          return reduceContext.getCurrentKey();
075        }
076    
077        @Override
078        public VALUEIN getCurrentValue() throws IOException, InterruptedException {
079          return reduceContext.getCurrentValue();
080        }
081    
082        @Override
083        public boolean nextKeyValue() throws IOException, InterruptedException {
084          return reduceContext.nextKeyValue();
085        }
086    
087        @Override
088        public Counter getCounter(Enum counterName) {
089          return reduceContext.getCounter(counterName);
090        }
091    
092        @Override
093        public Counter getCounter(String groupName, String counterName) {
094          return reduceContext.getCounter(groupName, counterName);
095        }
096    
097        @Override
098        public OutputCommitter getOutputCommitter() {
099          return reduceContext.getOutputCommitter();
100        }
101    
102        @Override
103        public void write(KEYOUT key, VALUEOUT value) throws IOException,
104            InterruptedException {
105          reduceContext.write(key, value);
106        }
107    
108        @Override
109        public String getStatus() {
110          return reduceContext.getStatus();
111        }
112    
113        @Override
114        public TaskAttemptID getTaskAttemptID() {
115          return reduceContext.getTaskAttemptID();
116        }
117    
118        @Override
119        public void setStatus(String msg) {
120          reduceContext.setStatus(msg);
121        }
122    
123        @Override
124        public Path[] getArchiveClassPaths() {
125          return reduceContext.getArchiveClassPaths();
126        }
127    
128        @Override
129        public String[] getArchiveTimestamps() {
130          return reduceContext.getArchiveTimestamps();
131        }
132    
133        @Override
134        public URI[] getCacheArchives() throws IOException {
135          return reduceContext.getCacheArchives();
136        }
137    
138        @Override
139        public URI[] getCacheFiles() throws IOException {
140          return reduceContext.getCacheFiles();
141        }
142    
143        @Override
144        public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
145            throws ClassNotFoundException {
146          return reduceContext.getCombinerClass();
147        }
148    
149        @Override
150        public Configuration getConfiguration() {
151          return reduceContext.getConfiguration();
152        }
153    
154        @Override
155        public Path[] getFileClassPaths() {
156          return reduceContext.getFileClassPaths();
157        }
158    
159        @Override
160        public String[] getFileTimestamps() {
161          return reduceContext.getFileTimestamps();
162        }
163    
164        @Override
165        public RawComparator<?> getCombinerKeyGroupingComparator() {
166          return reduceContext.getCombinerKeyGroupingComparator();
167        }
168    
169        @Override
170        public RawComparator<?> getGroupingComparator() {
171          return reduceContext.getGroupingComparator();
172        }
173    
174        @Override
175        public Class<? extends InputFormat<?, ?>> getInputFormatClass()
176            throws ClassNotFoundException {
177          return reduceContext.getInputFormatClass();
178        }
179    
180        @Override
181        public String getJar() {
182          return reduceContext.getJar();
183        }
184    
185        @Override
186        public JobID getJobID() {
187          return reduceContext.getJobID();
188        }
189    
190        @Override
191        public String getJobName() {
192          return reduceContext.getJobName();
193        }
194    
195        @Override
196        public boolean getJobSetupCleanupNeeded() {
197          return reduceContext.getJobSetupCleanupNeeded();
198        }
199    
200        @Override
201        public boolean getTaskCleanupNeeded() {
202          return reduceContext.getTaskCleanupNeeded();
203        }
204    
205        @Override
206        public Path[] getLocalCacheArchives() throws IOException {
207          return reduceContext.getLocalCacheArchives();
208        }
209    
210        @Override
211        public Path[] getLocalCacheFiles() throws IOException {
212          return reduceContext.getLocalCacheFiles();
213        }
214    
215        @Override
216        public Class<?> getMapOutputKeyClass() {
217          return reduceContext.getMapOutputKeyClass();
218        }
219    
220        @Override
221        public Class<?> getMapOutputValueClass() {
222          return reduceContext.getMapOutputValueClass();
223        }
224    
225        @Override
226        public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
227            throws ClassNotFoundException {
228          return reduceContext.getMapperClass();
229        }
230    
231        @Override
232        public int getMaxMapAttempts() {
233          return reduceContext.getMaxMapAttempts();
234        }
235    
236        @Override
237        public int getMaxReduceAttempts() {
238          return reduceContext.getMaxReduceAttempts();
239        }
240    
241        @Override
242        public int getNumReduceTasks() {
243          return reduceContext.getNumReduceTasks();
244        }
245    
246        @Override
247        public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
248            throws ClassNotFoundException {
249          return reduceContext.getOutputFormatClass();
250        }
251    
252        @Override
253        public Class<?> getOutputKeyClass() {
254          return reduceContext.getOutputKeyClass();
255        }
256    
257        @Override
258        public Class<?> getOutputValueClass() {
259          return reduceContext.getOutputValueClass();
260        }
261    
262        @Override
263        public Class<? extends Partitioner<?, ?>> getPartitionerClass()
264            throws ClassNotFoundException {
265          return reduceContext.getPartitionerClass();
266        }
267    
268        @Override
269        public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
270            throws ClassNotFoundException {
271          return reduceContext.getReducerClass();
272        }
273    
274        @Override
275        public RawComparator<?> getSortComparator() {
276          return reduceContext.getSortComparator();
277        }
278    
279        @Override
280        public boolean getSymlink() {
281          return reduceContext.getSymlink();
282        }
283    
284        @Override
285        public Path getWorkingDirectory() throws IOException {
286          return reduceContext.getWorkingDirectory();
287        }
288    
289        @Override
290        public void progress() {
291          reduceContext.progress();
292        }
293    
294        @Override
295        public Iterable<VALUEIN> getValues() throws IOException,
296            InterruptedException {
297          return reduceContext.getValues();
298        }
299    
300        @Override
301        public boolean nextKey() throws IOException, InterruptedException {
302          return reduceContext.nextKey();
303        }
304        
305        @Override
306        public boolean getProfileEnabled() {
307          return reduceContext.getProfileEnabled();
308        }
309    
310        @Override
311        public String getProfileParams() {
312          return reduceContext.getProfileParams();
313        }
314    
315        @Override
316        public IntegerRanges getProfileTaskRange(boolean isMap) {
317          return reduceContext.getProfileTaskRange(isMap);
318        }
319    
320        @Override
321        public String getUser() {
322          return reduceContext.getUser();
323        }
324    
325        @Override
326        public Credentials getCredentials() {
327          return reduceContext.getCredentials();
328        }
329        
330        @Override
331        public float getProgress() {
332          return reduceContext.getProgress();
333        }
334      }
335    }