001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.io.RawComparator;
027 import org.apache.hadoop.io.compress.CompressionCodec;
028 import org.apache.hadoop.mapreduce.task.MapContextImpl;
029
030 /**
031 * Maps input key/value pairs to a set of intermediate key/value pairs.
032 *
033 * <p>Maps are the individual tasks which transform input records into a
034 * intermediate records. The transformed intermediate records need not be of
035 * the same type as the input records. A given input pair may map to zero or
036 * many output pairs.</p>
037 *
038 * <p>The Hadoop Map-Reduce framework spawns one map task for each
039 * {@link InputSplit} generated by the {@link InputFormat} for the job.
040 * <code>Mapper</code> implementations can access the {@link Configuration} for
041 * the job via the {@link JobContext#getConfiguration()}.
042 *
043 * <p>The framework first calls
044 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
045 * {@link #map(Object, Object, Context)}
046 * for each key/value pair in the <code>InputSplit</code>. Finally
047 * {@link #cleanup(Context)} is called.</p>
048 *
049 * <p>All intermediate values associated with a given output key are
050 * subsequently grouped by the framework, and passed to a {@link Reducer} to
051 * determine the final output. Users can control the sorting and grouping by
052 * specifying two key {@link RawComparator} classes.</p>
053 *
054 * <p>The <code>Mapper</code> outputs are partitioned per
055 * <code>Reducer</code>. Users can control which keys (and hence records) go to
056 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
057 *
058 * <p>Users can optionally specify a <code>combiner</code>, via
059 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
060 * intermediate outputs, which helps to cut down the amount of data transferred
061 * from the <code>Mapper</code> to the <code>Reducer</code>.
062 *
063 * <p>Applications can specify if and how the intermediate
064 * outputs are to be compressed and which {@link CompressionCodec}s are to be
065 * used via the <code>Configuration</code>.</p>
066 *
067 * <p>If the job has zero
068 * reduces then the output of the <code>Mapper</code> is directly written
069 * to the {@link OutputFormat} without sorting by keys.</p>
070 *
071 * <p>Example:</p>
072 * <p><blockquote><pre>
073 * public class TokenCounterMapper
074 * extends Mapper<Object, Text, Text, IntWritable>{
075 *
076 * private final static IntWritable one = new IntWritable(1);
077 * private Text word = new Text();
078 *
079 * public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
080 * StringTokenizer itr = new StringTokenizer(value.toString());
081 * while (itr.hasMoreTokens()) {
082 * word.set(itr.nextToken());
083 * context.write(word, one);
084 * }
085 * }
086 * }
087 * </pre></blockquote></p>
088 *
089 * <p>Applications may override the {@link #run(Context)} method to exert
090 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s
091 * etc.</p>
092 *
093 * @see InputFormat
094 * @see JobContext
095 * @see Partitioner
096 * @see Reducer
097 */
098 @InterfaceAudience.Public
099 @InterfaceStability.Stable
100 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
101
102 /**
103 * The <code>Context</code> passed on to the {@link Mapper} implementations.
104 */
105 public abstract class Context
106 implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
107 }
108
109 /**
110 * Called once at the beginning of the task.
111 */
112 protected void setup(Context context
113 ) throws IOException, InterruptedException {
114 // NOTHING
115 }
116
117 /**
118 * Called once for each key/value pair in the input split. Most applications
119 * should override this, but the default is the identity function.
120 */
121 @SuppressWarnings("unchecked")
122 protected void map(KEYIN key, VALUEIN value,
123 Context context) throws IOException, InterruptedException {
124 context.write((KEYOUT) key, (VALUEOUT) value);
125 }
126
127 /**
128 * Called once at the end of the task.
129 */
130 protected void cleanup(Context context
131 ) throws IOException, InterruptedException {
132 // NOTHING
133 }
134
135 /**
136 * Expert users can override this method for more complete control over the
137 * execution of the Mapper.
138 * @param context
139 * @throws IOException
140 */
141 public void run(Context context) throws IOException, InterruptedException {
142 setup(context);
143 try {
144 while (context.nextKeyValue()) {
145 map(context.getCurrentKey(), context.getCurrentValue(), context);
146 }
147 } finally {
148 cleanup(context);
149 }
150 }
151 }