001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.io.RawComparator; 027import org.apache.hadoop.io.compress.CompressionCodec; 028import org.apache.hadoop.mapreduce.task.MapContextImpl; 029 030/** 031 * Maps input key/value pairs to a set of intermediate key/value pairs. 032 * 033 * <p>Maps are the individual tasks which transform input records into a 034 * intermediate records. The transformed intermediate records need not be of 035 * the same type as the input records. A given input pair may map to zero or 036 * many output pairs.</p> 037 * 038 * <p>The Hadoop Map-Reduce framework spawns one map task for each 039 * {@link InputSplit} generated by the {@link InputFormat} for the job. 040 * <code>Mapper</code> implementations can access the {@link Configuration} for 041 * the job via the {@link JobContext#getConfiguration()}. 042 * 043 * <p>The framework first calls 044 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by 045 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)} 046 * for each key/value pair in the <code>InputSplit</code>. Finally 047 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p> 048 * 049 * <p>All intermediate values associated with a given output key are 050 * subsequently grouped by the framework, and passed to a {@link Reducer} to 051 * determine the final output. Users can control the sorting and grouping by 052 * specifying two key {@link RawComparator} classes.</p> 053 * 054 * <p>The <code>Mapper</code> outputs are partitioned per 055 * <code>Reducer</code>. Users can control which keys (and hence records) go to 056 * which <code>Reducer</code> by implementing a custom {@link Partitioner}. 057 * 058 * <p>Users can optionally specify a <code>combiner</code>, via 059 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 060 * intermediate outputs, which helps to cut down the amount of data transferred 061 * from the <code>Mapper</code> to the <code>Reducer</code>. 062 * 063 * <p>Applications can specify if and how the intermediate 064 * outputs are to be compressed and which {@link CompressionCodec}s are to be 065 * used via the <code>Configuration</code>.</p> 066 * 067 * <p>If the job has zero 068 * reduces then the output of the <code>Mapper</code> is directly written 069 * to the {@link OutputFormat} without sorting by keys.</p> 070 * 071 * <p>Example:</p> 072 * <p><blockquote><pre> 073 * public class TokenCounterMapper 074 * extends Mapper<Object, Text, Text, IntWritable>{ 075 * 076 * private final static IntWritable one = new IntWritable(1); 077 * private Text word = new Text(); 078 * 079 * public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 080 * StringTokenizer itr = new StringTokenizer(value.toString()); 081 * while (itr.hasMoreTokens()) { 082 * word.set(itr.nextToken()); 083 * context.write(word, one); 084 * } 085 * } 086 * } 087 * </pre></blockquote> 088 * 089 * <p>Applications may override the 090 * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert 091 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 092 * etc.</p> 093 * 094 * @see InputFormat 095 * @see JobContext 096 * @see Partitioner 097 * @see Reducer 098 */ 099@InterfaceAudience.Public 100@InterfaceStability.Stable 101public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 102 103 /** 104 * The <code>Context</code> passed on to the {@link Mapper} implementations. 105 */ 106 public abstract class Context 107 implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 108 } 109 110 /** 111 * Called once at the beginning of the task. 112 */ 113 protected void setup(Context context 114 ) throws IOException, InterruptedException { 115 // NOTHING 116 } 117 118 /** 119 * Called once for each key/value pair in the input split. Most applications 120 * should override this, but the default is the identity function. 121 */ 122 @SuppressWarnings("unchecked") 123 protected void map(KEYIN key, VALUEIN value, 124 Context context) throws IOException, InterruptedException { 125 context.write((KEYOUT) key, (VALUEOUT) value); 126 } 127 128 /** 129 * Called once at the end of the task. 130 */ 131 protected void cleanup(Context context 132 ) throws IOException, InterruptedException { 133 // NOTHING 134 } 135 136 /** 137 * Expert users can override this method for more complete control over the 138 * execution of the Mapper. 139 * @param context 140 * @throws IOException 141 */ 142 public void run(Context context) throws IOException, InterruptedException { 143 setup(context); 144 try { 145 while (context.nextKeyValue()) { 146 map(context.getCurrentKey(), context.getCurrentValue(), context); 147 } 148 } finally { 149 cleanup(context); 150 } 151 } 152}