001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.io.RawComparator; 027 import org.apache.hadoop.io.compress.CompressionCodec; 028 import org.apache.hadoop.mapreduce.task.MapContextImpl; 029 030 /** 031 * Maps input key/value pairs to a set of intermediate key/value pairs. 032 * 033 * <p>Maps are the individual tasks which transform input records into a 034 * intermediate records. The transformed intermediate records need not be of 035 * the same type as the input records. A given input pair may map to zero or 036 * many output pairs.</p> 037 * 038 * <p>The Hadoop Map-Reduce framework spawns one map task for each 039 * {@link InputSplit} generated by the {@link InputFormat} for the job. 040 * <code>Mapper</code> implementations can access the {@link Configuration} for 041 * the job via the {@link JobContext#getConfiguration()}. 042 * 043 * <p>The framework first calls 044 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by 045 * {@link #map(Object, Object, Context)} 046 * for each key/value pair in the <code>InputSplit</code>. Finally 047 * {@link #cleanup(Context)} is called.</p> 048 * 049 * <p>All intermediate values associated with a given output key are 050 * subsequently grouped by the framework, and passed to a {@link Reducer} to 051 * determine the final output. Users can control the sorting and grouping by 052 * specifying two key {@link RawComparator} classes.</p> 053 * 054 * <p>The <code>Mapper</code> outputs are partitioned per 055 * <code>Reducer</code>. Users can control which keys (and hence records) go to 056 * which <code>Reducer</code> by implementing a custom {@link Partitioner}. 057 * 058 * <p>Users can optionally specify a <code>combiner</code>, via 059 * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 060 * intermediate outputs, which helps to cut down the amount of data transferred 061 * from the <code>Mapper</code> to the <code>Reducer</code>. 062 * 063 * <p>Applications can specify if and how the intermediate 064 * outputs are to be compressed and which {@link CompressionCodec}s are to be 065 * used via the <code>Configuration</code>.</p> 066 * 067 * <p>If the job has zero 068 * reduces then the output of the <code>Mapper</code> is directly written 069 * to the {@link OutputFormat} without sorting by keys.</p> 070 * 071 * <p>Example:</p> 072 * <p><blockquote><pre> 073 * public class TokenCounterMapper 074 * extends Mapper<Object, Text, Text, IntWritable>{ 075 * 076 * private final static IntWritable one = new IntWritable(1); 077 * private Text word = new Text(); 078 * 079 * public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 080 * StringTokenizer itr = new StringTokenizer(value.toString()); 081 * while (itr.hasMoreTokens()) { 082 * word.set(itr.nextToken()); 083 * context.write(word, one); 084 * } 085 * } 086 * } 087 * </pre></blockquote></p> 088 * 089 * <p>Applications may override the {@link #run(Context)} method to exert 090 * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 091 * etc.</p> 092 * 093 * @see InputFormat 094 * @see JobContext 095 * @see Partitioner 096 * @see Reducer 097 */ 098 @InterfaceAudience.Public 099 @InterfaceStability.Stable 100 public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { 101 102 /** 103 * The <code>Context</code> passed on to the {@link Mapper} implementations. 104 */ 105 public abstract class Context 106 implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 107 } 108 109 /** 110 * Called once at the beginning of the task. 111 */ 112 protected void setup(Context context 113 ) throws IOException, InterruptedException { 114 // NOTHING 115 } 116 117 /** 118 * Called once for each key/value pair in the input split. Most applications 119 * should override this, but the default is the identity function. 120 */ 121 @SuppressWarnings("unchecked") 122 protected void map(KEYIN key, VALUEIN value, 123 Context context) throws IOException, InterruptedException { 124 context.write((KEYOUT) key, (VALUEOUT) value); 125 } 126 127 /** 128 * Called once at the end of the task. 129 */ 130 protected void cleanup(Context context 131 ) throws IOException, InterruptedException { 132 // NOTHING 133 } 134 135 /** 136 * Expert users can override this method for more complete control over the 137 * execution of the Mapper. 138 * @param context 139 * @throws IOException 140 */ 141 public void run(Context context) throws IOException, InterruptedException { 142 setup(context); 143 try { 144 while (context.nextKeyValue()) { 145 map(context.getCurrentKey(), context.getCurrentValue(), context); 146 } 147 } finally { 148 cleanup(context); 149 } 150 } 151 }