001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.reduce; 020 021import java.io.IOException; 022import java.net.URI; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configuration.IntegerRanges; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.RawComparator; 030import org.apache.hadoop.mapreduce.Counter; 031import org.apache.hadoop.mapreduce.InputFormat; 032import org.apache.hadoop.mapreduce.JobID; 033import org.apache.hadoop.mapreduce.Mapper; 034import org.apache.hadoop.mapreduce.OutputCommitter; 035import org.apache.hadoop.mapreduce.OutputFormat; 036import org.apache.hadoop.mapreduce.Partitioner; 037import org.apache.hadoop.mapreduce.ReduceContext; 038import org.apache.hadoop.mapreduce.Reducer; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040import org.apache.hadoop.security.Credentials; 041 042/** 043 * A {@link Reducer} which wraps a given one to allow for custom 044 * {@link Reducer.Context} implementations. 045 */ 046@InterfaceAudience.Public 047@InterfaceStability.Evolving 048public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 049 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 050 051 /** 052 * A a wrapped {@link Reducer.Context} for custom implementations. 053 * @param reduceContext <code>ReduceContext</code> to be wrapped 054 * @return a wrapped <code>Reducer.Context</code> for custom implementations 055 */ 056 public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 057 getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) { 058 return new Context(reduceContext); 059 } 060 061 @InterfaceStability.Evolving 062 public class Context 063 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context { 064 065 protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext; 066 067 public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) 068 { 069 this.reduceContext = reduceContext; 070 } 071 072 @Override 073 public KEYIN getCurrentKey() throws IOException, InterruptedException { 074 return reduceContext.getCurrentKey(); 075 } 076 077 @Override 078 public VALUEIN getCurrentValue() throws IOException, InterruptedException { 079 return reduceContext.getCurrentValue(); 080 } 081 082 @Override 083 public boolean nextKeyValue() throws IOException, InterruptedException { 084 return reduceContext.nextKeyValue(); 085 } 086 087 @Override 088 public Counter getCounter(Enum counterName) { 089 return reduceContext.getCounter(counterName); 090 } 091 092 @Override 093 public Counter getCounter(String groupName, String counterName) { 094 return reduceContext.getCounter(groupName, counterName); 095 } 096 097 @Override 098 public OutputCommitter getOutputCommitter() { 099 return reduceContext.getOutputCommitter(); 100 } 101 102 @Override 103 public void write(KEYOUT key, VALUEOUT value) throws IOException, 104 InterruptedException { 105 reduceContext.write(key, value); 106 } 107 108 @Override 109 public String getStatus() { 110 return reduceContext.getStatus(); 111 } 112 113 @Override 114 public TaskAttemptID getTaskAttemptID() { 115 return reduceContext.getTaskAttemptID(); 116 } 117 118 @Override 119 public void setStatus(String msg) { 120 reduceContext.setStatus(msg); 121 } 122 123 @Override 124 public Path[] getArchiveClassPaths() { 125 return reduceContext.getArchiveClassPaths(); 126 } 127 128 @Override 129 public String[] getArchiveTimestamps() { 130 return reduceContext.getArchiveTimestamps(); 131 } 132 133 @Override 134 public URI[] getCacheArchives() throws IOException { 135 return reduceContext.getCacheArchives(); 136 } 137 138 @Override 139 public URI[] getCacheFiles() throws IOException { 140 return reduceContext.getCacheFiles(); 141 } 142 143 @Override 144 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() 145 throws ClassNotFoundException { 146 return reduceContext.getCombinerClass(); 147 } 148 149 @Override 150 public Configuration getConfiguration() { 151 return reduceContext.getConfiguration(); 152 } 153 154 @Override 155 public Path[] getFileClassPaths() { 156 return reduceContext.getFileClassPaths(); 157 } 158 159 @Override 160 public String[] getFileTimestamps() { 161 return reduceContext.getFileTimestamps(); 162 } 163 164 @Override 165 public RawComparator<?> getCombinerKeyGroupingComparator() { 166 return reduceContext.getCombinerKeyGroupingComparator(); 167 } 168 169 @Override 170 public RawComparator<?> getGroupingComparator() { 171 return reduceContext.getGroupingComparator(); 172 } 173 174 @Override 175 public Class<? extends InputFormat<?, ?>> getInputFormatClass() 176 throws ClassNotFoundException { 177 return reduceContext.getInputFormatClass(); 178 } 179 180 @Override 181 public String getJar() { 182 return reduceContext.getJar(); 183 } 184 185 @Override 186 public JobID getJobID() { 187 return reduceContext.getJobID(); 188 } 189 190 @Override 191 public String getJobName() { 192 return reduceContext.getJobName(); 193 } 194 195 @Override 196 public boolean getJobSetupCleanupNeeded() { 197 return reduceContext.getJobSetupCleanupNeeded(); 198 } 199 200 @Override 201 public boolean getTaskCleanupNeeded() { 202 return reduceContext.getTaskCleanupNeeded(); 203 } 204 205 @Override 206 public Path[] getLocalCacheArchives() throws IOException { 207 return reduceContext.getLocalCacheArchives(); 208 } 209 210 @Override 211 public Path[] getLocalCacheFiles() throws IOException { 212 return reduceContext.getLocalCacheFiles(); 213 } 214 215 @Override 216 public Class<?> getMapOutputKeyClass() { 217 return reduceContext.getMapOutputKeyClass(); 218 } 219 220 @Override 221 public Class<?> getMapOutputValueClass() { 222 return reduceContext.getMapOutputValueClass(); 223 } 224 225 @Override 226 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() 227 throws ClassNotFoundException { 228 return reduceContext.getMapperClass(); 229 } 230 231 @Override 232 public int getMaxMapAttempts() { 233 return reduceContext.getMaxMapAttempts(); 234 } 235 236 @Override 237 public int getMaxReduceAttempts() { 238 return reduceContext.getMaxReduceAttempts(); 239 } 240 241 @Override 242 public int getNumReduceTasks() { 243 return reduceContext.getNumReduceTasks(); 244 } 245 246 @Override 247 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() 248 throws ClassNotFoundException { 249 return reduceContext.getOutputFormatClass(); 250 } 251 252 @Override 253 public Class<?> getOutputKeyClass() { 254 return reduceContext.getOutputKeyClass(); 255 } 256 257 @Override 258 public Class<?> getOutputValueClass() { 259 return reduceContext.getOutputValueClass(); 260 } 261 262 @Override 263 public Class<? extends Partitioner<?, ?>> getPartitionerClass() 264 throws ClassNotFoundException { 265 return reduceContext.getPartitionerClass(); 266 } 267 268 @Override 269 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() 270 throws ClassNotFoundException { 271 return reduceContext.getReducerClass(); 272 } 273 274 @Override 275 public RawComparator<?> getSortComparator() { 276 return reduceContext.getSortComparator(); 277 } 278 279 @Override 280 public boolean getSymlink() { 281 return reduceContext.getSymlink(); 282 } 283 284 @Override 285 public Path getWorkingDirectory() throws IOException { 286 return reduceContext.getWorkingDirectory(); 287 } 288 289 @Override 290 public void progress() { 291 reduceContext.progress(); 292 } 293 294 @Override 295 public Iterable<VALUEIN> getValues() throws IOException, 296 InterruptedException { 297 return reduceContext.getValues(); 298 } 299 300 @Override 301 public boolean nextKey() throws IOException, InterruptedException { 302 return reduceContext.nextKey(); 303 } 304 305 @Override 306 public boolean getProfileEnabled() { 307 return reduceContext.getProfileEnabled(); 308 } 309 310 @Override 311 public String getProfileParams() { 312 return reduceContext.getProfileParams(); 313 } 314 315 @Override 316 public IntegerRanges getProfileTaskRange(boolean isMap) { 317 return reduceContext.getProfileTaskRange(isMap); 318 } 319 320 @Override 321 public String getUser() { 322 return reduceContext.getUser(); 323 } 324 325 @Override 326 public Credentials getCredentials() { 327 return reduceContext.getCredentials(); 328 } 329 330 @Override 331 public float getProgress() { 332 return reduceContext.getProgress(); 333 } 334 } 335}