001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.IOException;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
027
028import java.util.Iterator;
029
030/** 
031 * Reduces a set of intermediate values which share a key to a smaller set of
032 * values.  
033 * 
034 * <p><code>Reducer</code> implementations 
035 * can access the {@link Configuration} for the job via the 
036 * {@link JobContext#getConfiguration()} method.</p>
037
038 * <p><code>Reducer</code> has 3 primary phases:</p>
039 * <ol>
040 *   <li>
041 *   
042 *   <b id="Shuffle">Shuffle</b>
043 *   
044 *   <p>The <code>Reducer</code> copies the sorted output from each 
045 *   {@link Mapper} using HTTP across the network.</p>
046 *   </li>
047 *   
048 *   <li>
049 *   <b id="Sort">Sort</b>
050 *   
051 *   <p>The framework merge sorts <code>Reducer</code> inputs by 
052 *   <code>key</code>s 
053 *   (since different <code>Mapper</code>s may have output the same key).</p>
054 *   
055 *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
056 *   being fetched they are merged.</p>
057 *      
058 *   <b id="SecondarySort">SecondarySort</b>
059 *   
060 *   <p>To achieve a secondary sort on the values returned by the value 
061 *   iterator, the application should extend the key with the secondary
062 *   key and define a grouping comparator. The keys will be sorted using the
063 *   entire key, but will be grouped using the grouping comparator to decide
064 *   which keys and values are sent in the same call to reduce.The grouping 
065 *   comparator is specified via 
066 *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
067 *   controlled by 
068 *   {@link Job#setSortComparatorClass(Class)}.</p>
069 *   
070 *   
071 *   For example, say that you want to find duplicate web pages and tag them 
072 *   all with the url of the "best" known example. You would set up the job 
073 *   like:
074 *   <ul>
075 *     <li>Map Input Key: url</li>
076 *     <li>Map Input Value: document</li>
077 *     <li>Map Output Key: document checksum, url pagerank</li>
078 *     <li>Map Output Value: url</li>
079 *     <li>Partitioner: by checksum</li>
080 *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
081 *     <li>OutputValueGroupingComparator: by checksum</li>
082 *   </ul>
083 *   </li>
084 *   
085 *   <li>   
086 *   <b id="Reduce">Reduce</b>
087 *   
088 *   <p>In this phase the 
089 *   {@link #reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
090 *   method is called for each <code>&lt;key, (collection of values)&gt;</code> in
091 *   the sorted inputs.</p>
092 *   <p>The output of the reduce task is typically written to a 
093 *   {@link RecordWriter} via 
094 *   {@link Context#write(Object, Object)}.</p>
095 *   </li>
096 * </ol>
097 * 
098 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
099 * 
100 * <p>Example:</p>
101 * <p><blockquote><pre>
102 * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
103 *                                                 Key,IntWritable&gt; {
104 *   private IntWritable result = new IntWritable();
105 * 
106 *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
107 *                      Context context) throws IOException, InterruptedException {
108 *     int sum = 0;
109 *     for (IntWritable val : values) {
110 *       sum += val.get();
111 *     }
112 *     result.set(sum);
113 *     context.write(key, result);
114 *   }
115 * }
116 * </pre></blockquote>
117 * 
118 * @see Mapper
119 * @see Partitioner
120 */
121@Checkpointable
122@InterfaceAudience.Public
123@InterfaceStability.Stable
124public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
125
126  /**
127   * The <code>Context</code> passed on to the {@link Reducer} implementations.
128   */
129  public abstract class Context 
130    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
131  }
132
133  /**
134   * Called once at the start of the task.
135   */
136  protected void setup(Context context
137                       ) throws IOException, InterruptedException {
138    // NOTHING
139  }
140
141  /**
142   * This method is called once for each key. Most applications will define
143   * their reduce class by overriding this method. The default implementation
144   * is an identity function.
145   */
146  @SuppressWarnings("unchecked")
147  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
148                        ) throws IOException, InterruptedException {
149    for(VALUEIN value: values) {
150      context.write((KEYOUT) key, (VALUEOUT) value);
151    }
152  }
153
154  /**
155   * Called once at the end of the task.
156   */
157  protected void cleanup(Context context
158                         ) throws IOException, InterruptedException {
159    // NOTHING
160  }
161
162  /**
163   * Advanced application writers can use the 
164   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
165   * control how the reduce task works.
166   */
167  public void run(Context context) throws IOException, InterruptedException {
168    setup(context);
169    try {
170      while (context.nextKey()) {
171        reduce(context.getCurrentKey(), context.getValues(), context);
172        // If a back up store is used, reset it
173        Iterator<VALUEIN> iter = context.getValues().iterator();
174        if(iter instanceof ReduceContext.ValueIterator) {
175          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
176        }
177      }
178    } finally {
179      cleanup(context);
180    }
181  }
182}