001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022
023import java.util.Iterator;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.io.Closeable;
029
030/** 
031 * Reduces a set of intermediate values which share a key to a smaller set of
032 * values.  
033 * 
034 * <p>The number of <code>Reducer</code>s for the job is set by the user via 
035 * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 
036 * can access the {@link JobConf} for the job via the 
037 * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 
038 * Similarly they can use the {@link Closeable#close()} method for
039 * de-initialization.</p>
040
041 * <p><code>Reducer</code> has 3 primary phases:</p>
042 * <ol>
043 *   <li>
044 *   
045 *   <b id="Shuffle">Shuffle</b>
046 *   
047 *   <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
048 *   In the phase the framework, for each <code>Reducer</code>, fetches the 
049 *   relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 
050 *   </p>
051 *   </li>
052 *   
053 *   <li>
054 *   <b id="Sort">Sort</b>
055 *   
056 *   <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 
057 *   (since different <code>Mapper</code>s may have output the same key) in this
058 *   stage.</p>
059 *   
060 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
061 *   being fetched they are merged.</p>
062 *      
063 *   <b id="SecondarySort">SecondarySort</b>
064 *   
065 *   <p>If equivalence rules for keys while grouping the intermediates are 
066 *   different from those for grouping keys before reduction, then one may 
067 *   specify a <code>Comparator</code> via 
068 *   {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 
069 *   {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 
070 *   control how intermediate keys are grouped, these can be used in conjunction 
071 *   to simulate <i>secondary sort on values</i>.</p>
072 *   
073 *   
074 *   For example, say that you want to find duplicate web pages and tag them 
075 *   all with the url of the "best" known example. You would set up the job 
076 *   like:
077 *   <ul>
078 *     <li>Map Input Key: url</li>
079 *     <li>Map Input Value: document</li>
080 *     <li>Map Output Key: document checksum, url pagerank</li>
081 *     <li>Map Output Value: url</li>
082 *     <li>Partitioner: by checksum</li>
083 *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
084 *     <li>OutputValueGroupingComparator: by checksum</li>
085 *   </ul>
086 *   </li>
087 *   
088 *   <li>   
089 *   <b id="Reduce">Reduce</b>
090 *   
091 *   <p>In this phase the 
092 *   {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
093 *   method is called for each <code>&lt;key, (list of values)&gt;</code> pair in
094 *   the grouped inputs.</p>
095 *   <p>The output of the reduce task is typically written to the 
096 *   {@link FileSystem} via 
097 *   {@link OutputCollector#collect(Object, Object)}.</p>
098 *   </li>
099 * </ol>
100 * 
101 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
102 * 
103 * <p>Example:</p>
104 * <p><blockquote><pre>
105 *     public class MyReducer&lt;K extends WritableComparable, V extends Writable&gt; 
106 *     extends MapReduceBase implements Reducer&lt;K, V, K, V&gt; {
107 *     
108 *       static enum MyCounters { NUM_RECORDS }
109 *        
110 *       private String reduceTaskId;
111 *       private int noKeys = 0;
112 *       
113 *       public void configure(JobConf job) {
114 *         reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
115 *       }
116 *       
117 *       public void reduce(K key, Iterator&lt;V&gt; values,
118 *                          OutputCollector&lt;K, V&gt; output, 
119 *                          Reporter reporter)
120 *       throws IOException {
121 *       
122 *         // Process
123 *         int noValues = 0;
124 *         while (values.hasNext()) {
125 *           V value = values.next();
126 *           
127 *           // Increment the no. of values for this key
128 *           ++noValues;
129 *           
130 *           // Process the &lt;key, value&gt; pair (assume this takes a while)
131 *           // ...
132 *           // ...
133 *           
134 *           // Let the framework know that we are alive, and kicking!
135 *           if ((noValues%10) == 0) {
136 *             reporter.progress();
137 *           }
138 *         
139 *           // Process some more
140 *           // ...
141 *           // ...
142 *           
143 *           // Output the &lt;key, value&gt; 
144 *           output.collect(key, value);
145 *         }
146 *         
147 *         // Increment the no. of &lt;key, list of values&gt; pairs processed
148 *         ++noKeys;
149 *         
150 *         // Increment counters
151 *         reporter.incrCounter(NUM_RECORDS, 1);
152 *         
153 *         // Every 100 keys update application-level status
154 *         if ((noKeys%100) == 0) {
155 *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
156 *         }
157 *       }
158 *     }
159 * </pre></blockquote>
160 * 
161 * @see Mapper
162 * @see Partitioner
163 * @see Reporter
164 * @see MapReduceBase
165 */
166@InterfaceAudience.Public
167@InterfaceStability.Stable
168public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
169  
170  /** 
171   * <i>Reduces</i> values for a given key.  
172   * 
173   * <p>The framework calls this method for each 
174   * <code>&lt;key, (list of values)&gt;</code> pair in the grouped inputs.
175   * Output values must be of the same type as input values.  Input keys must 
176   * not be altered. The framework will <b>reuse</b> the key and value objects
177   * that are passed into the reduce, therefore the application should clone
178   * the objects they want to keep a copy of. In many cases, all values are 
179   * combined into zero or one value.
180   * </p>
181   *   
182   * <p>Output pairs are collected with calls to  
183   * {@link OutputCollector#collect(Object,Object)}.</p>
184   *
185   * <p>Applications can use the {@link Reporter} provided to report progress 
186   * or just indicate that they are alive. In scenarios where the application 
187   * takes a significant amount of time to process individual key/value 
188   * pairs, this is crucial since the framework might assume that the task has 
189   * timed-out and kill that task. The other way of avoiding this is to set 
190   * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.task.timeout">
191   * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
192   * time-outs).</p>
193   * 
194   * @param key the key.
195   * @param values the list of values to reduce.
196   * @param output to collect keys and combined values.
197   * @param reporter facility to report progress.
198   */
199  void reduce(K2 key, Iterator<V2> values,
200              OutputCollector<K3, V3> output, Reporter reporter)
201    throws IOException;
202
203}