001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred;
020
021 import java.io.IOException;
022
023 import java.util.Iterator;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.fs.FileSystem;
028 import org.apache.hadoop.io.Closeable;
029
030 /**
031 * Reduces a set of intermediate values which share a key to a smaller set of
032 * values.
033 *
034 * <p>The number of <code>Reducer</code>s for the job is set by the user via
035 * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations
036 * can access the {@link JobConf} for the job via the
037 * {@link JobConfigurable#configure(JobConf)} method and initialize themselves.
038 * Similarly they can use the {@link Closeable#close()} method for
039 * de-initialization.</p>
040
041 * <p><code>Reducer</code> has 3 primary phases:</p>
042 * <ol>
043 * <li>
044 *
045 * <h4 id="Shuffle">Shuffle</h4>
046 *
047 * <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
048 * In the phase the framework, for each <code>Reducer</code>, fetches the
049 * relevant partition of the output of all the <code>Mapper</code>s, via HTTP.
050 * </p>
051 * </li>
052 *
053 * <li>
054 * <h4 id="Sort">Sort</h4>
055 *
056 * <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s
057 * (since different <code>Mapper</code>s may have output the same key) in this
058 * stage.</p>
059 *
060 * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
061 * being fetched they are merged.</p>
062 *
063 * <h5 id="SecondarySort">SecondarySort</h5>
064 *
065 * <p>If equivalence rules for keys while grouping the intermediates are
066 * different from those for grouping keys before reduction, then one may
067 * specify a <code>Comparator</code> via
068 * {@link JobConf#setOutputValueGroupingComparator(Class)}.Since
069 * {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to
070 * control how intermediate keys are grouped, these can be used in conjunction
071 * to simulate <i>secondary sort on values</i>.</p>
072 *
073 *
074 * For example, say that you want to find duplicate web pages and tag them
075 * all with the url of the "best" known example. You would set up the job
076 * like:
077 * <ul>
078 * <li>Map Input Key: url</li>
079 * <li>Map Input Value: document</li>
080 * <li>Map Output Key: document checksum, url pagerank</li>
081 * <li>Map Output Value: url</li>
082 * <li>Partitioner: by checksum</li>
083 * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
084 * <li>OutputValueGroupingComparator: by checksum</li>
085 * </ul>
086 * </li>
087 *
088 * <li>
089 * <h4 id="Reduce">Reduce</h4>
090 *
091 * <p>In this phase the
092 * {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
093 * method is called for each <code><key, (list of values)></code> pair in
094 * the grouped inputs.</p>
095 * <p>The output of the reduce task is typically written to the
096 * {@link FileSystem} via
097 * {@link OutputCollector#collect(Object, Object)}.</p>
098 * </li>
099 * </ol>
100 *
101 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
102 *
103 * <p>Example:</p>
104 * <p><blockquote><pre>
105 * public class MyReducer<K extends WritableComparable, V extends Writable>
106 * extends MapReduceBase implements Reducer<K, V, K, V> {
107 *
108 * static enum MyCounters { NUM_RECORDS }
109 *
110 * private String reduceTaskId;
111 * private int noKeys = 0;
112 *
113 * public void configure(JobConf job) {
114 * reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
115 * }
116 *
117 * public void reduce(K key, Iterator<V> values,
118 * OutputCollector<K, V> output,
119 * Reporter reporter)
120 * throws IOException {
121 *
122 * // Process
123 * int noValues = 0;
124 * while (values.hasNext()) {
125 * V value = values.next();
126 *
127 * // Increment the no. of values for this key
128 * ++noValues;
129 *
130 * // Process the <key, value> pair (assume this takes a while)
131 * // ...
132 * // ...
133 *
134 * // Let the framework know that we are alive, and kicking!
135 * if ((noValues%10) == 0) {
136 * reporter.progress();
137 * }
138 *
139 * // Process some more
140 * // ...
141 * // ...
142 *
143 * // Output the <key, value>
144 * output.collect(key, value);
145 * }
146 *
147 * // Increment the no. of <key, list of values> pairs processed
148 * ++noKeys;
149 *
150 * // Increment counters
151 * reporter.incrCounter(NUM_RECORDS, 1);
152 *
153 * // Every 100 keys update application-level status
154 * if ((noKeys%100) == 0) {
155 * reporter.setStatus(reduceTaskId + " processed " + noKeys);
156 * }
157 * }
158 * }
159 * </pre></blockquote></p>
160 *
161 * @see Mapper
162 * @see Partitioner
163 * @see Reporter
164 * @see MapReduceBase
165 */
166 @InterfaceAudience.Public
167 @InterfaceStability.Stable
168 public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
169
170 /**
171 * <i>Reduces</i> values for a given key.
172 *
173 * <p>The framework calls this method for each
174 * <code><key, (list of values)></code> pair in the grouped inputs.
175 * Output values must be of the same type as input values. Input keys must
176 * not be altered. The framework will <b>reuse</b> the key and value objects
177 * that are passed into the reduce, therefore the application should clone
178 * the objects they want to keep a copy of. In many cases, all values are
179 * combined into zero or one value.
180 * </p>
181 *
182 * <p>Output pairs are collected with calls to
183 * {@link OutputCollector#collect(Object,Object)}.</p>
184 *
185 * <p>Applications can use the {@link Reporter} provided to report progress
186 * or just indicate that they are alive. In scenarios where the application
187 * takes a significant amount of time to process individual key/value
188 * pairs, this is crucial since the framework might assume that the task has
189 * timed-out and kill that task. The other way of avoiding this is to set
190 * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
191 * mapreduce.task.timeout</a> to a high-enough value (or even zero for no
192 * time-outs).</p>
193 *
194 * @param key the key.
195 * @param values the list of values to reduce.
196 * @param output to collect keys and combined values.
197 * @param reporter facility to report progress.
198 */
199 void reduce(K2 key, Iterator<V2> values,
200 OutputCollector<K3, V3> output, Reporter reporter)
201 throws IOException;
202
203 }