001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred; 020 021 import java.io.IOException; 022 023 import java.util.Iterator; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.fs.FileSystem; 028 import org.apache.hadoop.io.Closeable; 029 030 /** 031 * Reduces a set of intermediate values which share a key to a smaller set of 032 * values. 033 * 034 * <p>The number of <code>Reducer</code>s for the job is set by the user via 035 * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 036 * can access the {@link JobConf} for the job via the 037 * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 038 * Similarly they can use the {@link Closeable#close()} method for 039 * de-initialization.</p> 040 041 * <p><code>Reducer</code> has 3 primary phases:</p> 042 * <ol> 043 * <li> 044 * 045 * <h4 id="Shuffle">Shuffle</h4> 046 * 047 * <p><code>Reducer</code> is input the grouped output of a {@link Mapper}. 048 * In the phase the framework, for each <code>Reducer</code>, fetches the 049 * relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 050 * </p> 051 * </li> 052 * 053 * <li> 054 * <h4 id="Sort">Sort</h4> 055 * 056 * <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 057 * (since different <code>Mapper</code>s may have output the same key) in this 058 * stage.</p> 059 * 060 * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are 061 * being fetched they are merged.</p> 062 * 063 * <h5 id="SecondarySort">SecondarySort</h5> 064 * 065 * <p>If equivalence rules for keys while grouping the intermediates are 066 * different from those for grouping keys before reduction, then one may 067 * specify a <code>Comparator</code> via 068 * {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 069 * {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 070 * control how intermediate keys are grouped, these can be used in conjunction 071 * to simulate <i>secondary sort on values</i>.</p> 072 * 073 * 074 * For example, say that you want to find duplicate web pages and tag them 075 * all with the url of the "best" known example. You would set up the job 076 * like: 077 * <ul> 078 * <li>Map Input Key: url</li> 079 * <li>Map Input Value: document</li> 080 * <li>Map Output Key: document checksum, url pagerank</li> 081 * <li>Map Output Value: url</li> 082 * <li>Partitioner: by checksum</li> 083 * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li> 084 * <li>OutputValueGroupingComparator: by checksum</li> 085 * </ul> 086 * </li> 087 * 088 * <li> 089 * <h4 id="Reduce">Reduce</h4> 090 * 091 * <p>In this phase the 092 * {@link #reduce(Object, Iterator, OutputCollector, Reporter)} 093 * method is called for each <code><key, (list of values)></code> pair in 094 * the grouped inputs.</p> 095 * <p>The output of the reduce task is typically written to the 096 * {@link FileSystem} via 097 * {@link OutputCollector#collect(Object, Object)}.</p> 098 * </li> 099 * </ol> 100 * 101 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p> 102 * 103 * <p>Example:</p> 104 * <p><blockquote><pre> 105 * public class MyReducer<K extends WritableComparable, V extends Writable> 106 * extends MapReduceBase implements Reducer<K, V, K, V> { 107 * 108 * static enum MyCounters { NUM_RECORDS } 109 * 110 * private String reduceTaskId; 111 * private int noKeys = 0; 112 * 113 * public void configure(JobConf job) { 114 * reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID); 115 * } 116 * 117 * public void reduce(K key, Iterator<V> values, 118 * OutputCollector<K, V> output, 119 * Reporter reporter) 120 * throws IOException { 121 * 122 * // Process 123 * int noValues = 0; 124 * while (values.hasNext()) { 125 * V value = values.next(); 126 * 127 * // Increment the no. of values for this key 128 * ++noValues; 129 * 130 * // Process the <key, value> pair (assume this takes a while) 131 * // ... 132 * // ... 133 * 134 * // Let the framework know that we are alive, and kicking! 135 * if ((noValues%10) == 0) { 136 * reporter.progress(); 137 * } 138 * 139 * // Process some more 140 * // ... 141 * // ... 142 * 143 * // Output the <key, value> 144 * output.collect(key, value); 145 * } 146 * 147 * // Increment the no. of <key, list of values> pairs processed 148 * ++noKeys; 149 * 150 * // Increment counters 151 * reporter.incrCounter(NUM_RECORDS, 1); 152 * 153 * // Every 100 keys update application-level status 154 * if ((noKeys%100) == 0) { 155 * reporter.setStatus(reduceTaskId + " processed " + noKeys); 156 * } 157 * } 158 * } 159 * </pre></blockquote></p> 160 * 161 * @see Mapper 162 * @see Partitioner 163 * @see Reporter 164 * @see MapReduceBase 165 */ 166 @InterfaceAudience.Public 167 @InterfaceStability.Stable 168 public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable { 169 170 /** 171 * <i>Reduces</i> values for a given key. 172 * 173 * <p>The framework calls this method for each 174 * <code><key, (list of values)></code> pair in the grouped inputs. 175 * Output values must be of the same type as input values. Input keys must 176 * not be altered. The framework will <b>reuse</b> the key and value objects 177 * that are passed into the reduce, therefore the application should clone 178 * the objects they want to keep a copy of. In many cases, all values are 179 * combined into zero or one value. 180 * </p> 181 * 182 * <p>Output pairs are collected with calls to 183 * {@link OutputCollector#collect(Object,Object)}.</p> 184 * 185 * <p>Applications can use the {@link Reporter} provided to report progress 186 * or just indicate that they are alive. In scenarios where the application 187 * takes a significant amount of time to process individual key/value 188 * pairs, this is crucial since the framework might assume that the task has 189 * timed-out and kill that task. The other way of avoiding this is to set 190 * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout"> 191 * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 192 * time-outs).</p> 193 * 194 * @param key the key. 195 * @param values the list of values to reduce. 196 * @param output to collect keys and combined values. 197 * @param reporter facility to report progress. 198 */ 199 void reduce(K2 key, Iterator<V2> values, 200 OutputCollector<K3, V3> output, Reporter reporter) 201 throws IOException; 202 203 }