001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.fs.FileSystem;
026 import org.apache.hadoop.io.Closeable;
027 import org.apache.hadoop.io.SequenceFile;
028 import org.apache.hadoop.io.compress.CompressionCodec;
029
030 /**
031 * Maps input key/value pairs to a set of intermediate key/value pairs.
032 *
033 * <p>Maps are the individual tasks which transform input records into a
034 * intermediate records. The transformed intermediate records need not be of
035 * the same type as the input records. A given input pair may map to zero or
036 * many output pairs.</p>
037 *
038 * <p>The Hadoop Map-Reduce framework spawns one map task for each
039 * {@link InputSplit} generated by the {@link InputFormat} for the job.
040 * <code>Mapper</code> implementations can access the {@link JobConf} for the
041 * job via the {@link JobConfigurable#configure(JobConf)} and initialize
042 * themselves. Similarly they can use the {@link Closeable#close()} method for
043 * de-initialization.</p>
044 *
045 * <p>The framework then calls
046 * {@link #map(Object, Object, OutputCollector, Reporter)}
047 * for each key/value pair in the <code>InputSplit</code> for that task.</p>
048 *
049 * <p>All intermediate values associated with a given output key are
050 * subsequently grouped by the framework, and passed to a {@link Reducer} to
051 * determine the final output. Users can control the grouping by specifying
052 * a <code>Comparator</code> via
053 * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p>
054 *
055 * <p>The grouped <code>Mapper</code> outputs are partitioned per
056 * <code>Reducer</code>. Users can control which keys (and hence records) go to
057 * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
058 *
059 * <p>Users can optionally specify a <code>combiner</code>, via
060 * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
061 * intermediate outputs, which helps to cut down the amount of data transferred
062 * from the <code>Mapper</code> to the <code>Reducer</code>.
063 *
064 * <p>The intermediate, grouped outputs are always stored in
065 * {@link SequenceFile}s. Applications can specify if and how the intermediate
066 * outputs are to be compressed and which {@link CompressionCodec}s are to be
067 * used via the <code>JobConf</code>.</p>
068 *
069 * <p>If the job has
070 * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero
071 * reduces</a> then the output of the <code>Mapper</code> is directly written
072 * to the {@link FileSystem} without grouping by keys.</p>
073 *
074 * <p>Example:</p>
075 * <p><blockquote><pre>
076 * public class MyMapper<K extends WritableComparable, V extends Writable>
077 * extends MapReduceBase implements Mapper<K, V, K, V> {
078 *
079 * static enum MyCounters { NUM_RECORDS }
080 *
081 * private String mapTaskId;
082 * private String inputFile;
083 * private int noRecords = 0;
084 *
085 * public void configure(JobConf job) {
086 * mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
087 * inputFile = job.get(JobContext.MAP_INPUT_FILE);
088 * }
089 *
090 * public void map(K key, V val,
091 * OutputCollector<K, V> output, Reporter reporter)
092 * throws IOException {
093 * // Process the <key, value> pair (assume this takes a while)
094 * // ...
095 * // ...
096 *
097 * // Let the framework know that we are alive, and kicking!
098 * // reporter.progress();
099 *
100 * // Process some more
101 * // ...
102 * // ...
103 *
104 * // Increment the no. of <key, value> pairs processed
105 * ++noRecords;
106 *
107 * // Increment counters
108 * reporter.incrCounter(NUM_RECORDS, 1);
109 *
110 * // Every 100 records update application-level status
111 * if ((noRecords%100) == 0) {
112 * reporter.setStatus(mapTaskId + " processed " + noRecords +
113 * " from input-file: " + inputFile);
114 * }
115 *
116 * // Output the result
117 * output.collect(key, val);
118 * }
119 * }
120 * </pre></blockquote></p>
121 *
122 * <p>Applications may write a custom {@link MapRunnable} to exert greater
123 * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p>
124 *
125 * @see JobConf
126 * @see InputFormat
127 * @see Partitioner
128 * @see Reducer
129 * @see MapReduceBase
130 * @see MapRunnable
131 * @see SequenceFile
132 */
133 @InterfaceAudience.Public
134 @InterfaceStability.Stable
135 public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
136
137 /**
138 * Maps a single input key/value pair into an intermediate key/value pair.
139 *
140 * <p>Output pairs need not be of the same types as input pairs. A given
141 * input pair may map to zero or many output pairs. Output pairs are
142 * collected with calls to
143 * {@link OutputCollector#collect(Object,Object)}.</p>
144 *
145 * <p>Applications can use the {@link Reporter} provided to report progress
146 * or just indicate that they are alive. In scenarios where the application
147 * takes significant amount of time to process individual key/value
148 * pairs, this is crucial since the framework might assume that the task has
149 * timed-out and kill that task. The other way of avoiding this is to set
150 * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
151 * mapreduce.task.timeout</a> to a high-enough value (or even zero for no
152 * time-outs).</p>
153 *
154 * @param key the input key.
155 * @param value the input value.
156 * @param output collects mapped keys and values.
157 * @param reporter facility to report progress.
158 */
159 void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
160 throws IOException;
161 }