001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
027
028 import java.util.Iterator;
029
030 /**
031 * Reduces a set of intermediate values which share a key to a smaller set of
032 * values.
033 *
034 * <p><code>Reducer</code> implementations
035 * can access the {@link Configuration} for the job via the
036 * {@link JobContext#getConfiguration()} method.</p>
037
038 * <p><code>Reducer</code> has 3 primary phases:</p>
039 * <ol>
040 * <li>
041 *
042 * <h4 id="Shuffle">Shuffle</h4>
043 *
044 * <p>The <code>Reducer</code> copies the sorted output from each
045 * {@link Mapper} using HTTP across the network.</p>
046 * </li>
047 *
048 * <li>
049 * <h4 id="Sort">Sort</h4>
050 *
051 * <p>The framework merge sorts <code>Reducer</code> inputs by
052 * <code>key</code>s
053 * (since different <code>Mapper</code>s may have output the same key).</p>
054 *
055 * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
056 * being fetched they are merged.</p>
057 *
058 * <h5 id="SecondarySort">SecondarySort</h5>
059 *
060 * <p>To achieve a secondary sort on the values returned by the value
061 * iterator, the application should extend the key with the secondary
062 * key and define a grouping comparator. The keys will be sorted using the
063 * entire key, but will be grouped using the grouping comparator to decide
064 * which keys and values are sent in the same call to reduce.The grouping
065 * comparator is specified via
066 * {@link Job#setGroupingComparatorClass(Class)}. The sort order is
067 * controlled by
068 * {@link Job#setSortComparatorClass(Class)}.</p>
069 *
070 *
071 * For example, say that you want to find duplicate web pages and tag them
072 * all with the url of the "best" known example. You would set up the job
073 * like:
074 * <ul>
075 * <li>Map Input Key: url</li>
076 * <li>Map Input Value: document</li>
077 * <li>Map Output Key: document checksum, url pagerank</li>
078 * <li>Map Output Value: url</li>
079 * <li>Partitioner: by checksum</li>
080 * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
081 * <li>OutputValueGroupingComparator: by checksum</li>
082 * </ul>
083 * </li>
084 *
085 * <li>
086 * <h4 id="Reduce">Reduce</h4>
087 *
088 * <p>In this phase the
089 * {@link #reduce(Object, Iterable, Context)}
090 * method is called for each <code><key, (collection of values)></code> in
091 * the sorted inputs.</p>
092 * <p>The output of the reduce task is typically written to a
093 * {@link RecordWriter} via
094 * {@link Context#write(Object, Object)}.</p>
095 * </li>
096 * </ol>
097 *
098 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
099 *
100 * <p>Example:</p>
101 * <p><blockquote><pre>
102 * public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
103 * Key,IntWritable> {
104 * private IntWritable result = new IntWritable();
105 *
106 * public void reduce(Key key, Iterable<IntWritable> values,
107 * Context context) throws IOException, InterruptedException {
108 * int sum = 0;
109 * for (IntWritable val : values) {
110 * sum += val.get();
111 * }
112 * result.set(sum);
113 * context.write(key, result);
114 * }
115 * }
116 * </pre></blockquote></p>
117 *
118 * @see Mapper
119 * @see Partitioner
120 */
121 @Checkpointable
122 @InterfaceAudience.Public
123 @InterfaceStability.Stable
124 public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
125
126 /**
127 * The <code>Context</code> passed on to the {@link Reducer} implementations.
128 */
129 public abstract class Context
130 implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
131 }
132
133 /**
134 * Called once at the start of the task.
135 */
136 protected void setup(Context context
137 ) throws IOException, InterruptedException {
138 // NOTHING
139 }
140
141 /**
142 * This method is called once for each key. Most applications will define
143 * their reduce class by overriding this method. The default implementation
144 * is an identity function.
145 */
146 @SuppressWarnings("unchecked")
147 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
148 ) throws IOException, InterruptedException {
149 for(VALUEIN value: values) {
150 context.write((KEYOUT) key, (VALUEOUT) value);
151 }
152 }
153
154 /**
155 * Called once at the end of the task.
156 */
157 protected void cleanup(Context context
158 ) throws IOException, InterruptedException {
159 // NOTHING
160 }
161
162 /**
163 * Advanced application writers can use the
164 * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
165 * control how the reduce task works.
166 */
167 public void run(Context context) throws IOException, InterruptedException {
168 setup(context);
169 try {
170 while (context.nextKey()) {
171 reduce(context.getCurrentKey(), context.getValues(), context);
172 // If a back up store is used, reset it
173 Iterator<VALUEIN> iter = context.getValues().iterator();
174 if(iter instanceof ReduceContext.ValueIterator) {
175 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
176 }
177 }
178 } finally {
179 cleanup(context);
180 }
181 }
182 }