001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
027    
028    import java.util.Iterator;
029    
030    /** 
031     * Reduces a set of intermediate values which share a key to a smaller set of
032     * values.  
033     * 
034     * <p><code>Reducer</code> implementations 
035     * can access the {@link Configuration} for the job via the 
036     * {@link JobContext#getConfiguration()} method.</p>
037    
038     * <p><code>Reducer</code> has 3 primary phases:</p>
039     * <ol>
040     *   <li>
041     *   
042     *   <h4 id="Shuffle">Shuffle</h4>
043     *   
044     *   <p>The <code>Reducer</code> copies the sorted output from each 
045     *   {@link Mapper} using HTTP across the network.</p>
046     *   </li>
047     *   
048     *   <li>
049     *   <h4 id="Sort">Sort</h4>
050     *   
051     *   <p>The framework merge sorts <code>Reducer</code> inputs by 
052     *   <code>key</code>s 
053     *   (since different <code>Mapper</code>s may have output the same key).</p>
054     *   
055     *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
056     *   being fetched they are merged.</p>
057     *      
058     *   <h5 id="SecondarySort">SecondarySort</h5>
059     *   
060     *   <p>To achieve a secondary sort on the values returned by the value 
061     *   iterator, the application should extend the key with the secondary
062     *   key and define a grouping comparator. The keys will be sorted using the
063     *   entire key, but will be grouped using the grouping comparator to decide
064     *   which keys and values are sent in the same call to reduce.The grouping 
065     *   comparator is specified via 
066     *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
067     *   controlled by 
068     *   {@link Job#setSortComparatorClass(Class)}.</p>
069     *   
070     *   
071     *   For example, say that you want to find duplicate web pages and tag them 
072     *   all with the url of the "best" known example. You would set up the job 
073     *   like:
074     *   <ul>
075     *     <li>Map Input Key: url</li>
076     *     <li>Map Input Value: document</li>
077     *     <li>Map Output Key: document checksum, url pagerank</li>
078     *     <li>Map Output Value: url</li>
079     *     <li>Partitioner: by checksum</li>
080     *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
081     *     <li>OutputValueGroupingComparator: by checksum</li>
082     *   </ul>
083     *   </li>
084     *   
085     *   <li>   
086     *   <h4 id="Reduce">Reduce</h4>
087     *   
088     *   <p>In this phase the 
089     *   {@link #reduce(Object, Iterable, Context)}
090     *   method is called for each <code>&lt;key, (collection of values)&gt;</code> in
091     *   the sorted inputs.</p>
092     *   <p>The output of the reduce task is typically written to a 
093     *   {@link RecordWriter} via 
094     *   {@link Context#write(Object, Object)}.</p>
095     *   </li>
096     * </ol>
097     * 
098     * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
099     * 
100     * <p>Example:</p>
101     * <p><blockquote><pre>
102     * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
103     *                                                 Key,IntWritable&gt; {
104     *   private IntWritable result = new IntWritable();
105     * 
106     *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
107     *                      Context context) throws IOException, InterruptedException {
108     *     int sum = 0;
109     *     for (IntWritable val : values) {
110     *       sum += val.get();
111     *     }
112     *     result.set(sum);
113     *     context.write(key, result);
114     *   }
115     * }
116     * </pre></blockquote></p>
117     * 
118     * @see Mapper
119     * @see Partitioner
120     */
121    @Checkpointable
122    @InterfaceAudience.Public
123    @InterfaceStability.Stable
124    public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
125    
126      /**
127       * The <code>Context</code> passed on to the {@link Reducer} implementations.
128       */
129      public abstract class Context 
130        implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
131      }
132    
133      /**
134       * Called once at the start of the task.
135       */
136      protected void setup(Context context
137                           ) throws IOException, InterruptedException {
138        // NOTHING
139      }
140    
141      /**
142       * This method is called once for each key. Most applications will define
143       * their reduce class by overriding this method. The default implementation
144       * is an identity function.
145       */
146      @SuppressWarnings("unchecked")
147      protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
148                            ) throws IOException, InterruptedException {
149        for(VALUEIN value: values) {
150          context.write((KEYOUT) key, (VALUEOUT) value);
151        }
152      }
153    
154      /**
155       * Called once at the end of the task.
156       */
157      protected void cleanup(Context context
158                             ) throws IOException, InterruptedException {
159        // NOTHING
160      }
161    
162      /**
163       * Advanced application writers can use the 
164       * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
165       * control how the reduce task works.
166       */
167      public void run(Context context) throws IOException, InterruptedException {
168        setup(context);
169        try {
170          while (context.nextKey()) {
171            reduce(context.getCurrentKey(), context.getValues(), context);
172            // If a back up store is used, reset it
173            Iterator<VALUEIN> iter = context.getValues().iterator();
174            if(iter instanceof ReduceContext.ValueIterator) {
175              ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
176            }
177          }
178        } finally {
179          cleanup(context);
180        }
181      }
182    }