001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.io.RawComparator; 027import org.apache.hadoop.mapred.RawKeyValueIterator; 028 029import java.util.Iterator; 030 031/** 032 * Reduces a set of intermediate values which share a key to a smaller set of 033 * values. 034 * 035 * <p><code>Reducer</code> implementations 036 * can access the {@link Configuration} for the job via the 037 * {@link JobContext#getConfiguration()} method.</p> 038 039 * <p><code>Reducer</code> has 3 primary phases:</p> 040 * <ol> 041 * <li> 042 * 043 * <h4 id="Shuffle">Shuffle</h4> 044 * 045 * <p>The <code>Reducer</code> copies the sorted output from each 046 * {@link Mapper} using HTTP across the network.</p> 047 * </li> 048 * 049 * <li> 050 * <h4 id="Sort">Sort</h4> 051 * 052 * <p>The framework merge sorts <code>Reducer</code> inputs by 053 * <code>key</code>s 054 * (since different <code>Mapper</code>s may have output the same key).</p> 055 * 056 * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are 057 * being fetched they are merged.</p> 058 * 059 * <h5 id="SecondarySort">SecondarySort</h5> 060 * 061 * <p>To achieve a secondary sort on the values returned by the value 062 * iterator, the application should extend the key with the secondary 063 * key and define a grouping comparator. The keys will be sorted using the 064 * entire key, but will be grouped using the grouping comparator to decide 065 * which keys and values are sent in the same call to reduce.The grouping 066 * comparator is specified via 067 * {@link Job#setGroupingComparatorClass(Class)}. The sort order is 068 * controlled by 069 * {@link Job#setSortComparatorClass(Class)}.</p> 070 * 071 * 072 * For example, say that you want to find duplicate web pages and tag them 073 * all with the url of the "best" known example. You would set up the job 074 * like: 075 * <ul> 076 * <li>Map Input Key: url</li> 077 * <li>Map Input Value: document</li> 078 * <li>Map Output Key: document checksum, url pagerank</li> 079 * <li>Map Output Value: url</li> 080 * <li>Partitioner: by checksum</li> 081 * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li> 082 * <li>OutputValueGroupingComparator: by checksum</li> 083 * </ul> 084 * </li> 085 * 086 * <li> 087 * <h4 id="Reduce">Reduce</h4> 088 * 089 * <p>In this phase the 090 * {@link #reduce(Object, Iterable, Context)} 091 * method is called for each <code><key, (collection of values)></code> in 092 * the sorted inputs.</p> 093 * <p>The output of the reduce task is typically written to a 094 * {@link RecordWriter} via 095 * {@link Context#write(Object, Object)}.</p> 096 * </li> 097 * </ol> 098 * 099 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p> 100 * 101 * <p>Example:</p> 102 * <p><blockquote><pre> 103 * public class IntSumReducer<Key> extends Reducer<Key,IntWritable, 104 * Key,IntWritable> { 105 * private IntWritable result = new IntWritable(); 106 * 107 * public void reduce(Key key, Iterable<IntWritable> values, 108 * Context context) throws IOException, InterruptedException { 109 * int sum = 0; 110 * for (IntWritable val : values) { 111 * sum += val.get(); 112 * } 113 * result.set(sum); 114 * context.write(key, result); 115 * } 116 * } 117 * </pre></blockquote></p> 118 * 119 * @see Mapper 120 * @see Partitioner 121 */ 122@InterfaceAudience.Public 123@InterfaceStability.Stable 124public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 125 126 /** 127 * The <code>Context</code> passed on to the {@link Reducer} implementations. 128 */ 129 public abstract class Context 130 implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 131 } 132 133 /** 134 * Called once at the start of the task. 135 */ 136 protected void setup(Context context 137 ) throws IOException, InterruptedException { 138 // NOTHING 139 } 140 141 /** 142 * This method is called once for each key. Most applications will define 143 * their reduce class by overriding this method. The default implementation 144 * is an identity function. 145 */ 146 @SuppressWarnings("unchecked") 147 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 148 ) throws IOException, InterruptedException { 149 for(VALUEIN value: values) { 150 context.write((KEYOUT) key, (VALUEOUT) value); 151 } 152 } 153 154 /** 155 * Called once at the end of the task. 156 */ 157 protected void cleanup(Context context 158 ) throws IOException, InterruptedException { 159 // NOTHING 160 } 161 162 /** 163 * Advanced application writers can use the 164 * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to 165 * control how the reduce task works. 166 */ 167 public void run(Context context) throws IOException, InterruptedException { 168 setup(context); 169 while (context.nextKey()) { 170 reduce(context.getCurrentKey(), context.getValues(), context); 171 // If a back up store is used, reset it 172 Iterator<VALUEIN> iter = context.getValues().iterator(); 173 if(iter instanceof ReduceContext.ValueIterator) { 174 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); 175 } 176 } 177 cleanup(context); 178 } 179}