001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.map; 020 021import java.io.IOException; 022import java.net.URI; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.conf.Configuration.IntegerRanges; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.io.RawComparator; 030import org.apache.hadoop.mapreduce.Counter; 031import org.apache.hadoop.mapreduce.InputFormat; 032import org.apache.hadoop.mapreduce.InputSplit; 033import org.apache.hadoop.mapreduce.JobID; 034import org.apache.hadoop.mapreduce.MapContext; 035import org.apache.hadoop.mapreduce.Mapper; 036import org.apache.hadoop.mapreduce.OutputCommitter; 037import org.apache.hadoop.mapreduce.OutputFormat; 038import org.apache.hadoop.mapreduce.Partitioner; 039import org.apache.hadoop.mapreduce.Reducer; 040import org.apache.hadoop.mapreduce.TaskAttemptID; 041import org.apache.hadoop.security.Credentials; 042 043/** 044 * A {@link Mapper} which wraps a given one to allow custom 045 * {@link Mapper.Context} implementations. 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Evolving 049public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 050 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 051 052 /** 053 * Get a wrapped {@link Mapper.Context} for custom implementations. 054 * @param mapContext <code>MapContext</code> to be wrapped 055 * @return a wrapped <code>Mapper.Context</code> for custom implementations 056 */ 057 public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 058 getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { 059 return new Context(mapContext); 060 } 061 062 @InterfaceStability.Evolving 063 public class Context 064 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context { 065 066 protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext; 067 068 public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { 069 this.mapContext = mapContext; 070 } 071 072 /** 073 * Get the input split for this map. 074 */ 075 public InputSplit getInputSplit() { 076 return mapContext.getInputSplit(); 077 } 078 079 @Override 080 public KEYIN getCurrentKey() throws IOException, InterruptedException { 081 return mapContext.getCurrentKey(); 082 } 083 084 @Override 085 public VALUEIN getCurrentValue() throws IOException, InterruptedException { 086 return mapContext.getCurrentValue(); 087 } 088 089 @Override 090 public boolean nextKeyValue() throws IOException, InterruptedException { 091 return mapContext.nextKeyValue(); 092 } 093 094 @Override 095 public Counter getCounter(Enum<?> counterName) { 096 return mapContext.getCounter(counterName); 097 } 098 099 @Override 100 public Counter getCounter(String groupName, String counterName) { 101 return mapContext.getCounter(groupName, counterName); 102 } 103 104 @Override 105 public OutputCommitter getOutputCommitter() { 106 return mapContext.getOutputCommitter(); 107 } 108 109 @Override 110 public void write(KEYOUT key, VALUEOUT value) throws IOException, 111 InterruptedException { 112 mapContext.write(key, value); 113 } 114 115 @Override 116 public String getStatus() { 117 return mapContext.getStatus(); 118 } 119 120 @Override 121 public TaskAttemptID getTaskAttemptID() { 122 return mapContext.getTaskAttemptID(); 123 } 124 125 @Override 126 public void setStatus(String msg) { 127 mapContext.setStatus(msg); 128 } 129 130 @Override 131 public Path[] getArchiveClassPaths() { 132 return mapContext.getArchiveClassPaths(); 133 } 134 135 @Override 136 public String[] getArchiveTimestamps() { 137 return mapContext.getArchiveTimestamps(); 138 } 139 140 @Override 141 public URI[] getCacheArchives() throws IOException { 142 return mapContext.getCacheArchives(); 143 } 144 145 @Override 146 public URI[] getCacheFiles() throws IOException { 147 return mapContext.getCacheFiles(); 148 } 149 150 @Override 151 public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() 152 throws ClassNotFoundException { 153 return mapContext.getCombinerClass(); 154 } 155 156 @Override 157 public Configuration getConfiguration() { 158 return mapContext.getConfiguration(); 159 } 160 161 @Override 162 public Path[] getFileClassPaths() { 163 return mapContext.getFileClassPaths(); 164 } 165 166 @Override 167 public String[] getFileTimestamps() { 168 return mapContext.getFileTimestamps(); 169 } 170 171 @Override 172 public RawComparator<?> getCombinerKeyGroupingComparator() { 173 return mapContext.getCombinerKeyGroupingComparator(); 174 } 175 176 @Override 177 public RawComparator<?> getGroupingComparator() { 178 return mapContext.getGroupingComparator(); 179 } 180 181 @Override 182 public Class<? extends InputFormat<?, ?>> getInputFormatClass() 183 throws ClassNotFoundException { 184 return mapContext.getInputFormatClass(); 185 } 186 187 @Override 188 public String getJar() { 189 return mapContext.getJar(); 190 } 191 192 @Override 193 public JobID getJobID() { 194 return mapContext.getJobID(); 195 } 196 197 @Override 198 public String getJobName() { 199 return mapContext.getJobName(); 200 } 201 202 @Override 203 public boolean getJobSetupCleanupNeeded() { 204 return mapContext.getJobSetupCleanupNeeded(); 205 } 206 207 @Override 208 public boolean getTaskCleanupNeeded() { 209 return mapContext.getTaskCleanupNeeded(); 210 } 211 212 @Override 213 public Path[] getLocalCacheArchives() throws IOException { 214 return mapContext.getLocalCacheArchives(); 215 } 216 217 @Override 218 public Path[] getLocalCacheFiles() throws IOException { 219 return mapContext.getLocalCacheFiles(); 220 } 221 222 @Override 223 public Class<?> getMapOutputKeyClass() { 224 return mapContext.getMapOutputKeyClass(); 225 } 226 227 @Override 228 public Class<?> getMapOutputValueClass() { 229 return mapContext.getMapOutputValueClass(); 230 } 231 232 @Override 233 public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() 234 throws ClassNotFoundException { 235 return mapContext.getMapperClass(); 236 } 237 238 @Override 239 public int getMaxMapAttempts() { 240 return mapContext.getMaxMapAttempts(); 241 } 242 243 @Override 244 public int getMaxReduceAttempts() { 245 return mapContext.getMaxReduceAttempts(); 246 } 247 248 @Override 249 public int getNumReduceTasks() { 250 return mapContext.getNumReduceTasks(); 251 } 252 253 @Override 254 public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() 255 throws ClassNotFoundException { 256 return mapContext.getOutputFormatClass(); 257 } 258 259 @Override 260 public Class<?> getOutputKeyClass() { 261 return mapContext.getOutputKeyClass(); 262 } 263 264 @Override 265 public Class<?> getOutputValueClass() { 266 return mapContext.getOutputValueClass(); 267 } 268 269 @Override 270 public Class<? extends Partitioner<?, ?>> getPartitionerClass() 271 throws ClassNotFoundException { 272 return mapContext.getPartitionerClass(); 273 } 274 275 @Override 276 public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() 277 throws ClassNotFoundException { 278 return mapContext.getReducerClass(); 279 } 280 281 @Override 282 public RawComparator<?> getSortComparator() { 283 return mapContext.getSortComparator(); 284 } 285 286 @Override 287 public boolean getSymlink() { 288 return mapContext.getSymlink(); 289 } 290 291 @Override 292 public Path getWorkingDirectory() throws IOException { 293 return mapContext.getWorkingDirectory(); 294 } 295 296 @Override 297 public void progress() { 298 mapContext.progress(); 299 } 300 301 @Override 302 public boolean getProfileEnabled() { 303 return mapContext.getProfileEnabled(); 304 } 305 306 @Override 307 public String getProfileParams() { 308 return mapContext.getProfileParams(); 309 } 310 311 @Override 312 public IntegerRanges getProfileTaskRange(boolean isMap) { 313 return mapContext.getProfileTaskRange(isMap); 314 } 315 316 @Override 317 public String getUser() { 318 return mapContext.getUser(); 319 } 320 321 @Override 322 public Credentials getCredentials() { 323 return mapContext.getCredentials(); 324 } 325 326 @Override 327 public float getProgress() { 328 return mapContext.getProgress(); 329 } 330 } 331}