001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.RawComparator;
027    import org.apache.hadoop.io.compress.CompressionCodec;
028    import org.apache.hadoop.mapreduce.task.MapContextImpl;
029    
030    /** 
031     * Maps input key/value pairs to a set of intermediate key/value pairs.  
032     * 
033     * <p>Maps are the individual tasks which transform input records into a 
034     * intermediate records. The transformed intermediate records need not be of 
035     * the same type as the input records. A given input pair may map to zero or 
036     * many output pairs.</p> 
037     * 
038     * <p>The Hadoop Map-Reduce framework spawns one map task for each 
039     * {@link InputSplit} generated by the {@link InputFormat} for the job.
040     * <code>Mapper</code> implementations can access the {@link Configuration} for 
041     * the job via the {@link JobContext#getConfiguration()}.
042     * 
043     * <p>The framework first calls 
044     * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
045     * {@link #map(Object, Object, Context)} 
046     * for each key/value pair in the <code>InputSplit</code>. Finally 
047     * {@link #cleanup(Context)} is called.</p>
048     * 
049     * <p>All intermediate values associated with a given output key are 
050     * subsequently grouped by the framework, and passed to a {@link Reducer} to  
051     * determine the final output. Users can control the sorting and grouping by 
052     * specifying two key {@link RawComparator} classes.</p>
053     *
054     * <p>The <code>Mapper</code> outputs are partitioned per 
055     * <code>Reducer</code>. Users can control which keys (and hence records) go to 
056     * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
057     * 
058     * <p>Users can optionally specify a <code>combiner</code>, via 
059     * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
060     * intermediate outputs, which helps to cut down the amount of data transferred 
061     * from the <code>Mapper</code> to the <code>Reducer</code>.
062     * 
063     * <p>Applications can specify if and how the intermediate
064     * outputs are to be compressed and which {@link CompressionCodec}s are to be
065     * used via the <code>Configuration</code>.</p>
066     *  
067     * <p>If the job has zero
068     * reduces then the output of the <code>Mapper</code> is directly written
069     * to the {@link OutputFormat} without sorting by keys.</p>
070     * 
071     * <p>Example:</p>
072     * <p><blockquote><pre>
073     * public class TokenCounterMapper 
074     *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
075     *    
076     *   private final static IntWritable one = new IntWritable(1);
077     *   private Text word = new Text();
078     *   
079     *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
080     *     StringTokenizer itr = new StringTokenizer(value.toString());
081     *     while (itr.hasMoreTokens()) {
082     *       word.set(itr.nextToken());
083     *       context.write(word, one);
084     *     }
085     *   }
086     * }
087     * </pre></blockquote></p>
088     *
089     * <p>Applications may override the {@link #run(Context)} method to exert 
090     * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
091     * etc.</p>
092     * 
093     * @see InputFormat
094     * @see JobContext
095     * @see Partitioner  
096     * @see Reducer
097     */
098    @InterfaceAudience.Public
099    @InterfaceStability.Stable
100    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
101    
102      /**
103       * The <code>Context</code> passed on to the {@link Mapper} implementations.
104       */
105      public abstract class Context
106        implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
107      }
108      
109      /**
110       * Called once at the beginning of the task.
111       */
112      protected void setup(Context context
113                           ) throws IOException, InterruptedException {
114        // NOTHING
115      }
116    
117      /**
118       * Called once for each key/value pair in the input split. Most applications
119       * should override this, but the default is the identity function.
120       */
121      @SuppressWarnings("unchecked")
122      protected void map(KEYIN key, VALUEIN value, 
123                         Context context) throws IOException, InterruptedException {
124        context.write((KEYOUT) key, (VALUEOUT) value);
125      }
126    
127      /**
128       * Called once at the end of the task.
129       */
130      protected void cleanup(Context context
131                             ) throws IOException, InterruptedException {
132        // NOTHING
133      }
134      
135      /**
136       * Expert users can override this method for more complete control over the
137       * execution of the Mapper.
138       * @param context
139       * @throws IOException
140       */
141      public void run(Context context) throws IOException, InterruptedException {
142        setup(context);
143        try {
144          while (context.nextKeyValue()) {
145            map(context.getCurrentKey(), context.getCurrentValue(), context);
146          }
147        } finally {
148          cleanup(context);
149        }
150      }
151    }