001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.RawComparator;
027import org.apache.hadoop.io.compress.CompressionCodec;
028import org.apache.hadoop.mapreduce.task.MapContextImpl;
029
030/** 
031 * Maps input key/value pairs to a set of intermediate key/value pairs.  
032 * 
033 * <p>Maps are the individual tasks which transform input records into a 
034 * intermediate records. The transformed intermediate records need not be of 
035 * the same type as the input records. A given input pair may map to zero or 
036 * many output pairs.</p> 
037 * 
038 * <p>The Hadoop Map-Reduce framework spawns one map task for each 
039 * {@link InputSplit} generated by the {@link InputFormat} for the job.
040 * <code>Mapper</code> implementations can access the {@link Configuration} for 
041 * the job via the {@link JobContext#getConfiguration()}.
042 * 
043 * <p>The framework first calls 
044 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
045 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
046 * for each key/value pair in the <code>InputSplit</code>. Finally 
047 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.</p>
048 * 
049 * <p>All intermediate values associated with a given output key are 
050 * subsequently grouped by the framework, and passed to a {@link Reducer} to  
051 * determine the final output. Users can control the sorting and grouping by 
052 * specifying two key {@link RawComparator} classes.</p>
053 *
054 * <p>The <code>Mapper</code> outputs are partitioned per 
055 * <code>Reducer</code>. Users can control which keys (and hence records) go to 
056 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
057 * 
058 * <p>Users can optionally specify a <code>combiner</code>, via 
059 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
060 * intermediate outputs, which helps to cut down the amount of data transferred 
061 * from the <code>Mapper</code> to the <code>Reducer</code>.
062 * 
063 * <p>Applications can specify if and how the intermediate
064 * outputs are to be compressed and which {@link CompressionCodec}s are to be
065 * used via the <code>Configuration</code>.</p>
066 *  
067 * <p>If the job has zero
068 * reduces then the output of the <code>Mapper</code> is directly written
069 * to the {@link OutputFormat} without sorting by keys.</p>
070 * 
071 * <p>Example:</p>
072 * <p><blockquote><pre>
073 * public class TokenCounterMapper 
074 *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
075 *    
076 *   private final static IntWritable one = new IntWritable(1);
077 *   private Text word = new Text();
078 *   
079 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
080 *     StringTokenizer itr = new StringTokenizer(value.toString());
081 *     while (itr.hasMoreTokens()) {
082 *       word.set(itr.nextToken());
083 *       context.write(word, one);
084 *     }
085 *   }
086 * }
087 * </pre></blockquote>
088 *
089 * <p>Applications may override the
090 * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
091 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
092 * etc.</p>
093 * 
094 * @see InputFormat
095 * @see JobContext
096 * @see Partitioner  
097 * @see Reducer
098 */
099@InterfaceAudience.Public
100@InterfaceStability.Stable
101public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
102
103  /**
104   * The <code>Context</code> passed on to the {@link Mapper} implementations.
105   */
106  public abstract class Context
107    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
108  }
109  
110  /**
111   * Called once at the beginning of the task.
112   */
113  protected void setup(Context context
114                       ) throws IOException, InterruptedException {
115    // NOTHING
116  }
117
118  /**
119   * Called once for each key/value pair in the input split. Most applications
120   * should override this, but the default is the identity function.
121   */
122  @SuppressWarnings("unchecked")
123  protected void map(KEYIN key, VALUEIN value, 
124                     Context context) throws IOException, InterruptedException {
125    context.write((KEYOUT) key, (VALUEOUT) value);
126  }
127
128  /**
129   * Called once at the end of the task.
130   */
131  protected void cleanup(Context context
132                         ) throws IOException, InterruptedException {
133    // NOTHING
134  }
135  
136  /**
137   * Expert users can override this method for more complete control over the
138   * execution of the Mapper.
139   * @param context
140   * @throws IOException
141   */
142  public void run(Context context) throws IOException, InterruptedException {
143    setup(context);
144    try {
145      while (context.nextKeyValue()) {
146        map(context.getCurrentKey(), context.getCurrentValue(), context);
147      }
148    } finally {
149      cleanup(context);
150    }
151  }
152}