001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.fs.FileSystem;
026    import org.apache.hadoop.io.Closeable;
027    import org.apache.hadoop.io.SequenceFile;
028    import org.apache.hadoop.io.compress.CompressionCodec;
029    
030    /** 
031     * Maps input key/value pairs to a set of intermediate key/value pairs.  
032     * 
033     * <p>Maps are the individual tasks which transform input records into a 
034     * intermediate records. The transformed intermediate records need not be of 
035     * the same type as the input records. A given input pair may map to zero or 
036     * many output pairs.</p> 
037     * 
038     * <p>The Hadoop Map-Reduce framework spawns one map task for each 
039     * {@link InputSplit} generated by the {@link InputFormat} for the job.
040     * <code>Mapper</code> implementations can access the {@link JobConf} for the 
041     * job via the {@link JobConfigurable#configure(JobConf)} and initialize
042     * themselves. Similarly they can use the {@link Closeable#close()} method for
043     * de-initialization.</p>
044     * 
045     * <p>The framework then calls 
046     * {@link #map(Object, Object, OutputCollector, Reporter)} 
047     * for each key/value pair in the <code>InputSplit</code> for that task.</p>
048     * 
049     * <p>All intermediate values associated with a given output key are 
050     * subsequently grouped by the framework, and passed to a {@link Reducer} to  
051     * determine the final output. Users can control the grouping by specifying
052     * a <code>Comparator</code> via 
053     * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p>
054     *
055     * <p>The grouped <code>Mapper</code> outputs are partitioned per 
056     * <code>Reducer</code>. Users can control which keys (and hence records) go to 
057     * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
058     * 
059     * <p>Users can optionally specify a <code>combiner</code>, via 
060     * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the 
061     * intermediate outputs, which helps to cut down the amount of data transferred 
062     * from the <code>Mapper</code> to the <code>Reducer</code>.
063     * 
064     * <p>The intermediate, grouped outputs are always stored in 
065     * {@link SequenceFile}s. Applications can specify if and how the intermediate
066     * outputs are to be compressed and which {@link CompressionCodec}s are to be
067     * used via the <code>JobConf</code>.</p>
068     *  
069     * <p>If the job has 
070     * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero
071     * reduces</a> then the output of the <code>Mapper</code> is directly written
072     * to the {@link FileSystem} without grouping by keys.</p>
073     * 
074     * <p>Example:</p>
075     * <p><blockquote><pre>
076     *     public class MyMapper<K extends WritableComparable, V extends Writable> 
077     *     extends MapReduceBase implements Mapper<K, V, K, V> {
078     *     
079     *       static enum MyCounters { NUM_RECORDS }
080     *       
081     *       private String mapTaskId;
082     *       private String inputFile;
083     *       private int noRecords = 0;
084     *       
085     *       public void configure(JobConf job) {
086     *         mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
087     *         inputFile = job.get(JobContext.MAP_INPUT_FILE);
088     *       }
089     *       
090     *       public void map(K key, V val,
091     *                       OutputCollector<K, V> output, Reporter reporter)
092     *       throws IOException {
093     *         // Process the <key, value> pair (assume this takes a while)
094     *         // ...
095     *         // ...
096     *         
097     *         // Let the framework know that we are alive, and kicking!
098     *         // reporter.progress();
099     *         
100     *         // Process some more
101     *         // ...
102     *         // ...
103     *         
104     *         // Increment the no. of <key, value> pairs processed
105     *         ++noRecords;
106     *
107     *         // Increment counters
108     *         reporter.incrCounter(NUM_RECORDS, 1);
109     *        
110     *         // Every 100 records update application-level status
111     *         if ((noRecords%100) == 0) {
112     *           reporter.setStatus(mapTaskId + " processed " + noRecords + 
113     *                              " from input-file: " + inputFile); 
114     *         }
115     *         
116     *         // Output the result
117     *         output.collect(key, val);
118     *       }
119     *     }
120     * </pre></blockquote></p>
121     *
122     * <p>Applications may write a custom {@link MapRunnable} to exert greater
123     * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p>
124     * 
125     * @see JobConf
126     * @see InputFormat
127     * @see Partitioner  
128     * @see Reducer
129     * @see MapReduceBase
130     * @see MapRunnable
131     * @see SequenceFile
132     */
133    @InterfaceAudience.Public
134    @InterfaceStability.Stable
135    public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
136      
137      /** 
138       * Maps a single input key/value pair into an intermediate key/value pair.
139       * 
140       * <p>Output pairs need not be of the same types as input pairs.  A given 
141       * input pair may map to zero or many output pairs.  Output pairs are 
142       * collected with calls to 
143       * {@link OutputCollector#collect(Object,Object)}.</p>
144       *
145       * <p>Applications can use the {@link Reporter} provided to report progress 
146       * or just indicate that they are alive. In scenarios where the application 
147       * takes significant amount of time to process individual key/value
148       * pairs, this is crucial since the framework might assume that the task has 
149       * timed-out and kill that task. The other way of avoiding this is to set 
150       * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
151       * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
152       * time-outs).</p>
153       * 
154       * @param key the input key.
155       * @param value the input value.
156       * @param output collects mapped keys and values.
157       * @param reporter facility to report progress.
158       */
159      void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
160      throws IOException;
161    }