001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    
023    import java.util.Iterator;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.fs.FileSystem;
028    import org.apache.hadoop.io.Closeable;
029    
030    /** 
031     * Reduces a set of intermediate values which share a key to a smaller set of
032     * values.  
033     * 
034     * <p>The number of <code>Reducer</code>s for the job is set by the user via 
035     * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 
036     * can access the {@link JobConf} for the job via the 
037     * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 
038     * Similarly they can use the {@link Closeable#close()} method for
039     * de-initialization.</p>
040    
041     * <p><code>Reducer</code> has 3 primary phases:</p>
042     * <ol>
043     *   <li>
044     *   
045     *   <h4 id="Shuffle">Shuffle</h4>
046     *   
047     *   <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
048     *   In the phase the framework, for each <code>Reducer</code>, fetches the 
049     *   relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 
050     *   </p>
051     *   </li>
052     *   
053     *   <li>
054     *   <h4 id="Sort">Sort</h4>
055     *   
056     *   <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 
057     *   (since different <code>Mapper</code>s may have output the same key) in this
058     *   stage.</p>
059     *   
060     *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
061     *   being fetched they are merged.</p>
062     *      
063     *   <h5 id="SecondarySort">SecondarySort</h5>
064     *   
065     *   <p>If equivalence rules for keys while grouping the intermediates are 
066     *   different from those for grouping keys before reduction, then one may 
067     *   specify a <code>Comparator</code> via 
068     *   {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 
069     *   {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 
070     *   control how intermediate keys are grouped, these can be used in conjunction 
071     *   to simulate <i>secondary sort on values</i>.</p>
072     *   
073     *   
074     *   For example, say that you want to find duplicate web pages and tag them 
075     *   all with the url of the "best" known example. You would set up the job 
076     *   like:
077     *   <ul>
078     *     <li>Map Input Key: url</li>
079     *     <li>Map Input Value: document</li>
080     *     <li>Map Output Key: document checksum, url pagerank</li>
081     *     <li>Map Output Value: url</li>
082     *     <li>Partitioner: by checksum</li>
083     *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
084     *     <li>OutputValueGroupingComparator: by checksum</li>
085     *   </ul>
086     *   </li>
087     *   
088     *   <li>   
089     *   <h4 id="Reduce">Reduce</h4>
090     *   
091     *   <p>In this phase the 
092     *   {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
093     *   method is called for each <code>&lt;key, (list of values)></code> pair in
094     *   the grouped inputs.</p>
095     *   <p>The output of the reduce task is typically written to the 
096     *   {@link FileSystem} via 
097     *   {@link OutputCollector#collect(Object, Object)}.</p>
098     *   </li>
099     * </ol>
100     * 
101     * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
102     * 
103     * <p>Example:</p>
104     * <p><blockquote><pre>
105     *     public class MyReducer&lt;K extends WritableComparable, V extends Writable&gt; 
106     *     extends MapReduceBase implements Reducer&lt;K, V, K, V&gt; {
107     *     
108     *       static enum MyCounters { NUM_RECORDS }
109     *        
110     *       private String reduceTaskId;
111     *       private int noKeys = 0;
112     *       
113     *       public void configure(JobConf job) {
114     *         reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
115     *       }
116     *       
117     *       public void reduce(K key, Iterator&lt;V&gt; values,
118     *                          OutputCollector&lt;K, V&gt; output, 
119     *                          Reporter reporter)
120     *       throws IOException {
121     *       
122     *         // Process
123     *         int noValues = 0;
124     *         while (values.hasNext()) {
125     *           V value = values.next();
126     *           
127     *           // Increment the no. of values for this key
128     *           ++noValues;
129     *           
130     *           // Process the &lt;key, value&gt; pair (assume this takes a while)
131     *           // ...
132     *           // ...
133     *           
134     *           // Let the framework know that we are alive, and kicking!
135     *           if ((noValues%10) == 0) {
136     *             reporter.progress();
137     *           }
138     *         
139     *           // Process some more
140     *           // ...
141     *           // ...
142     *           
143     *           // Output the &lt;key, value&gt; 
144     *           output.collect(key, value);
145     *         }
146     *         
147     *         // Increment the no. of &lt;key, list of values&gt; pairs processed
148     *         ++noKeys;
149     *         
150     *         // Increment counters
151     *         reporter.incrCounter(NUM_RECORDS, 1);
152     *         
153     *         // Every 100 keys update application-level status
154     *         if ((noKeys%100) == 0) {
155     *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
156     *         }
157     *       }
158     *     }
159     * </pre></blockquote></p>
160     * 
161     * @see Mapper
162     * @see Partitioner
163     * @see Reporter
164     * @see MapReduceBase
165     */
166    @InterfaceAudience.Public
167    @InterfaceStability.Stable
168    public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
169      
170      /** 
171       * <i>Reduces</i> values for a given key.  
172       * 
173       * <p>The framework calls this method for each 
174       * <code>&lt;key, (list of values)></code> pair in the grouped inputs.
175       * Output values must be of the same type as input values.  Input keys must 
176       * not be altered. The framework will <b>reuse</b> the key and value objects
177       * that are passed into the reduce, therefore the application should clone
178       * the objects they want to keep a copy of. In many cases, all values are 
179       * combined into zero or one value.
180       * </p>
181       *   
182       * <p>Output pairs are collected with calls to  
183       * {@link OutputCollector#collect(Object,Object)}.</p>
184       *
185       * <p>Applications can use the {@link Reporter} provided to report progress 
186       * or just indicate that they are alive. In scenarios where the application 
187       * takes a significant amount of time to process individual key/value 
188       * pairs, this is crucial since the framework might assume that the task has 
189       * timed-out and kill that task. The other way of avoiding this is to set 
190       * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
191       * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
192       * time-outs).</p>
193       * 
194       * @param key the key.
195       * @param values the list of values to reduce.
196       * @param output to collect keys and combined values.
197       * @param reporter facility to report progress.
198       */
199      void reduce(K2 key, Iterator<V2> values,
200                  OutputCollector<K3, V3> output, Reporter reporter)
201        throws IOException;
202    
203    }