001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.reduce; 020 021import java.io.IOException; 022import java.net.URI; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configuration.IntegerRanges; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.RawComparator; 030import org.apache.hadoop.mapreduce.Counter; 031import org.apache.hadoop.mapreduce.InputFormat; 032import org.apache.hadoop.mapreduce.JobID; 033import org.apache.hadoop.mapreduce.Mapper; 034import org.apache.hadoop.mapreduce.OutputCommitter; 035import org.apache.hadoop.mapreduce.OutputFormat; 036import org.apache.hadoop.mapreduce.Partitioner; 037import org.apache.hadoop.mapreduce.ReduceContext; 038import org.apache.hadoop.mapreduce.Reducer; 039import org.apache.hadoop.mapreduce.TaskAttemptID; 040import org.apache.hadoop.security.Credentials; 041 042/** 043 * A {@link Reducer} which wraps a given one to allow for custom 044 * {@link Reducer.Context} implementations. 045 */ 046@InterfaceAudience.Public 047@InterfaceStability.Evolving 048public class WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 049 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 050 051 /** 052 * A a wrapped {@link Reducer.Context} for custom implementations. 053 * @param reduceContext <code>ReduceContext</code> to be wrapped 054 * @return a wrapped <code>Reducer.Context</code> for custom implementations 055 */ 056 public Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 057 getReducerContext(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) { 058 return new Context(reduceContext); 059 } 060 061 @InterfaceStability.Evolving 062 public class Context 063 extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context { 064 065 protected ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext; 066 067 public Context(ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext) 068 { 069 this.reduceContext = reduceContext; 070 } 071 072 @Override 073 public KEYIN getCurrentKey() throws IOException, InterruptedException { 074 return reduceContext.getCurrentKey(); 075 } 076 077 @Override 078 public VALUEIN getCurrentValue() throws IOException, InterruptedException { 079 return reduceContext.getCurrentValue(); 080 } 081 082 @Override 083 public boolean nextKeyValue() throws IOException, InterruptedException { 084 return reduceContext.nextKeyValue(); 085 } 086 087 @Override 088 public Counter getCounter(Enum counterName) { 089 return reduceContext.getCounter(counterName); 090 } 091 092 @Override 093 public Counter getCounter(String groupName, String counterName) { 094 return reduceContext.getCounter(groupName, counterName); 095 } 096 097 @Override 098 public OutputCommitter getOutputCommitter() { 099 return reduceContext.getOutputCommitter(); 100 } 101 102 @Override 103 public void write(KEYOUT key, VALUEOUT value) throws IOException, 104 InterruptedException { 105 reduceContext.write(key, value); 106 } 107 108 @Override 109 public String getStatus() { 110 return reduceContext.getStatus(); 111 } 112 113 @Override 114 public TaskAttemptID getTaskAttemptID() { 115 return reduceContext.getTaskAttemptID(); 116 } 117 118 @Override 119 public void setStatus(String msg) { 120 reduceContext.setStatus(msg); 121 } 122 123 @Override 124 public Path[] getArchiveClassPaths() { 125 return reduceContext.getArchiveClassPaths(); 126 } 127 128 @Override 129 public String[] getArchiveTimestamps() { 130 return reduceContext.getArchiveTimestamps(); 131 } 132 133 @Override 134 public URI[] getCacheArchives() throws IOException { 135 return reduceContext.getCacheArchives(); 136 } 137 138 @Override 139 public URI[] getCacheFiles() throws IOException { 140 return reduceContext.getCacheArchives(); 141 } 142 143 @Override 144 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() 145 throws ClassNotFoundException { 146 return reduceContext.getCombinerClass(); 147 } 148 149 @Override 150 public Configuration getConfiguration() { 151 return reduceContext.getConfiguration(); 152 } 153 154 @Override 155 public Path[] getFileClassPaths() { 156 return reduceContext.getFileClassPaths(); 157 } 158 159 @Override 160 public String[] getFileTimestamps() { 161 return reduceContext.getFileTimestamps(); 162 } 163 164 @Override 165 public RawComparator<?> getGroupingComparator() { 166 return reduceContext.getGroupingComparator(); 167 } 168 169 @Override 170 public Class<? extends InputFormat<?, ?>> getInputFormatClass() 171 throws ClassNotFoundException { 172 return reduceContext.getInputFormatClass(); 173 } 174 175 @Override 176 public String getJar() { 177 return reduceContext.getJar(); 178 } 179 180 @Override 181 public JobID getJobID() { 182 return reduceContext.getJobID(); 183 } 184 185 @Override 186 public String getJobName() { 187 return reduceContext.getJobName(); 188 } 189 190 @Override 191 public boolean getJobSetupCleanupNeeded() { 192 return reduceContext.getJobSetupCleanupNeeded(); 193 } 194 195 @Override 196 public boolean getTaskCleanupNeeded() { 197 return reduceContext.getTaskCleanupNeeded(); 198 } 199 200 @Override 201 public Path[] getLocalCacheArchives() throws IOException { 202 return reduceContext.getLocalCacheArchives(); 203 } 204 205 @Override 206 public Path[] getLocalCacheFiles() throws IOException { 207 return reduceContext.getLocalCacheFiles(); 208 } 209 210 @Override 211 public Class<?> getMapOutputKeyClass() { 212 return reduceContext.getMapOutputKeyClass(); 213 } 214 215 @Override 216 public Class<?> getMapOutputValueClass() { 217 return reduceContext.getMapOutputValueClass(); 218 } 219 220 @Override 221 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() 222 throws ClassNotFoundException { 223 return reduceContext.getMapperClass(); 224 } 225 226 @Override 227 public int getMaxMapAttempts() { 228 return reduceContext.getMaxMapAttempts(); 229 } 230 231 @Override 232 public int getMaxReduceAttempts() { 233 return reduceContext.getMaxReduceAttempts(); 234 } 235 236 @Override 237 public int getNumReduceTasks() { 238 return reduceContext.getNumReduceTasks(); 239 } 240 241 @Override 242 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() 243 throws ClassNotFoundException { 244 return reduceContext.getOutputFormatClass(); 245 } 246 247 @Override 248 public Class<?> getOutputKeyClass() { 249 return reduceContext.getOutputKeyClass(); 250 } 251 252 @Override 253 public Class<?> getOutputValueClass() { 254 return reduceContext.getOutputValueClass(); 255 } 256 257 @Override 258 public Class<? extends Partitioner<?, ?>> getPartitionerClass() 259 throws ClassNotFoundException { 260 return reduceContext.getPartitionerClass(); 261 } 262 263 @Override 264 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() 265 throws ClassNotFoundException { 266 return reduceContext.getReducerClass(); 267 } 268 269 @Override 270 public RawComparator<?> getSortComparator() { 271 return reduceContext.getSortComparator(); 272 } 273 274 @Override 275 public boolean getSymlink() { 276 return reduceContext.getSymlink(); 277 } 278 279 @Override 280 public Path getWorkingDirectory() throws IOException { 281 return reduceContext.getWorkingDirectory(); 282 } 283 284 @Override 285 public void progress() { 286 reduceContext.progress(); 287 } 288 289 @Override 290 public Iterable<VALUEIN> getValues() throws IOException, 291 InterruptedException { 292 return reduceContext.getValues(); 293 } 294 295 @Override 296 public boolean nextKey() throws IOException, InterruptedException { 297 return reduceContext.nextKey(); 298 } 299 300 @Override 301 public boolean getProfileEnabled() { 302 return reduceContext.getProfileEnabled(); 303 } 304 305 @Override 306 public String getProfileParams() { 307 return reduceContext.getProfileParams(); 308 } 309 310 @Override 311 public IntegerRanges getProfileTaskRange(boolean isMap) { 312 return reduceContext.getProfileTaskRange(isMap); 313 } 314 315 @Override 316 public String getUser() { 317 return reduceContext.getUser(); 318 } 319 320 @Override 321 public Credentials getCredentials() { 322 return reduceContext.getCredentials(); 323 } 324 325 @Override 326 public float getProgress() { 327 return reduceContext.getProgress(); 328 } 329 } 330}