001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.fs.FileSystem; 026 import org.apache.hadoop.io.Closeable; 027 import org.apache.hadoop.io.SequenceFile; 028 import org.apache.hadoop.io.compress.CompressionCodec; 029 030 /** 031 * Maps input key/value pairs to a set of intermediate key/value pairs. 032 * 033 * <p>Maps are the individual tasks which transform input records into a 034 * intermediate records. The transformed intermediate records need not be of 035 * the same type as the input records. A given input pair may map to zero or 036 * many output pairs.</p> 037 * 038 * <p>The Hadoop Map-Reduce framework spawns one map task for each 039 * {@link InputSplit} generated by the {@link InputFormat} for the job. 040 * <code>Mapper</code> implementations can access the {@link JobConf} for the 041 * job via the {@link JobConfigurable#configure(JobConf)} and initialize 042 * themselves. Similarly they can use the {@link Closeable#close()} method for 043 * de-initialization.</p> 044 * 045 * <p>The framework then calls 046 * {@link #map(Object, Object, OutputCollector, Reporter)} 047 * for each key/value pair in the <code>InputSplit</code> for that task.</p> 048 * 049 * <p>All intermediate values associated with a given output key are 050 * subsequently grouped by the framework, and passed to a {@link Reducer} to 051 * determine the final output. Users can control the grouping by specifying 052 * a <code>Comparator</code> via 053 * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p> 054 * 055 * <p>The grouped <code>Mapper</code> outputs are partitioned per 056 * <code>Reducer</code>. Users can control which keys (and hence records) go to 057 * which <code>Reducer</code> by implementing a custom {@link Partitioner}. 058 * 059 * <p>Users can optionally specify a <code>combiner</code>, via 060 * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the 061 * intermediate outputs, which helps to cut down the amount of data transferred 062 * from the <code>Mapper</code> to the <code>Reducer</code>. 063 * 064 * <p>The intermediate, grouped outputs are always stored in 065 * {@link SequenceFile}s. Applications can specify if and how the intermediate 066 * outputs are to be compressed and which {@link CompressionCodec}s are to be 067 * used via the <code>JobConf</code>.</p> 068 * 069 * <p>If the job has 070 * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero 071 * reduces</a> then the output of the <code>Mapper</code> is directly written 072 * to the {@link FileSystem} without grouping by keys.</p> 073 * 074 * <p>Example:</p> 075 * <p><blockquote><pre> 076 * public class MyMapper<K extends WritableComparable, V extends Writable> 077 * extends MapReduceBase implements Mapper<K, V, K, V> { 078 * 079 * static enum MyCounters { NUM_RECORDS } 080 * 081 * private String mapTaskId; 082 * private String inputFile; 083 * private int noRecords = 0; 084 * 085 * public void configure(JobConf job) { 086 * mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); 087 * inputFile = job.get(JobContext.MAP_INPUT_FILE); 088 * } 089 * 090 * public void map(K key, V val, 091 * OutputCollector<K, V> output, Reporter reporter) 092 * throws IOException { 093 * // Process the <key, value> pair (assume this takes a while) 094 * // ... 095 * // ... 096 * 097 * // Let the framework know that we are alive, and kicking! 098 * // reporter.progress(); 099 * 100 * // Process some more 101 * // ... 102 * // ... 103 * 104 * // Increment the no. of <key, value> pairs processed 105 * ++noRecords; 106 * 107 * // Increment counters 108 * reporter.incrCounter(NUM_RECORDS, 1); 109 * 110 * // Every 100 records update application-level status 111 * if ((noRecords%100) == 0) { 112 * reporter.setStatus(mapTaskId + " processed " + noRecords + 113 * " from input-file: " + inputFile); 114 * } 115 * 116 * // Output the result 117 * output.collect(key, val); 118 * } 119 * } 120 * </pre></blockquote></p> 121 * 122 * <p>Applications may write a custom {@link MapRunnable} to exert greater 123 * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p> 124 * 125 * @see JobConf 126 * @see InputFormat 127 * @see Partitioner 128 * @see Reducer 129 * @see MapReduceBase 130 * @see MapRunnable 131 * @see SequenceFile 132 */ 133 @InterfaceAudience.Public 134 @InterfaceStability.Stable 135 public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable { 136 137 /** 138 * Maps a single input key/value pair into an intermediate key/value pair. 139 * 140 * <p>Output pairs need not be of the same types as input pairs. A given 141 * input pair may map to zero or many output pairs. Output pairs are 142 * collected with calls to 143 * {@link OutputCollector#collect(Object,Object)}.</p> 144 * 145 * <p>Applications can use the {@link Reporter} provided to report progress 146 * or just indicate that they are alive. In scenarios where the application 147 * takes significant amount of time to process individual key/value 148 * pairs, this is crucial since the framework might assume that the task has 149 * timed-out and kill that task. The other way of avoiding this is to set 150 * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout"> 151 * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 152 * time-outs).</p> 153 * 154 * @param key the input key. 155 * @param value the input value. 156 * @param output collects mapped keys and values. 157 * @param reporter facility to report progress. 158 */ 159 void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) 160 throws IOException; 161 }