001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; 027 028import java.util.Iterator; 029 030/** 031 * Reduces a set of intermediate values which share a key to a smaller set of 032 * values. 033 * 034 * <p><code>Reducer</code> implementations 035 * can access the {@link Configuration} for the job via the 036 * {@link JobContext#getConfiguration()} method.</p> 037 038 * <p><code>Reducer</code> has 3 primary phases:</p> 039 * <ol> 040 * <li> 041 * 042 * <b id="Shuffle">Shuffle</b> 043 * 044 * <p>The <code>Reducer</code> copies the sorted output from each 045 * {@link Mapper} using HTTP across the network.</p> 046 * </li> 047 * 048 * <li> 049 * <b id="Sort">Sort</b> 050 * 051 * <p>The framework merge sorts <code>Reducer</code> inputs by 052 * <code>key</code>s 053 * (since different <code>Mapper</code>s may have output the same key).</p> 054 * 055 * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are 056 * being fetched they are merged.</p> 057 * 058 * <b id="SecondarySort">SecondarySort</b> 059 * 060 * <p>To achieve a secondary sort on the values returned by the value 061 * iterator, the application should extend the key with the secondary 062 * key and define a grouping comparator. The keys will be sorted using the 063 * entire key, but will be grouped using the grouping comparator to decide 064 * which keys and values are sent in the same call to reduce.The grouping 065 * comparator is specified via 066 * {@link Job#setGroupingComparatorClass(Class)}. The sort order is 067 * controlled by 068 * {@link Job#setSortComparatorClass(Class)}.</p> 069 * 070 * 071 * For example, say that you want to find duplicate web pages and tag them 072 * all with the url of the "best" known example. You would set up the job 073 * like: 074 * <ul> 075 * <li>Map Input Key: url</li> 076 * <li>Map Input Value: document</li> 077 * <li>Map Output Key: document checksum, url pagerank</li> 078 * <li>Map Output Value: url</li> 079 * <li>Partitioner: by checksum</li> 080 * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li> 081 * <li>OutputValueGroupingComparator: by checksum</li> 082 * </ul> 083 * </li> 084 * 085 * <li> 086 * <b id="Reduce">Reduce</b> 087 * 088 * <p>In this phase the 089 * {@link #reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)} 090 * method is called for each <code><key, (collection of values)></code> in 091 * the sorted inputs.</p> 092 * <p>The output of the reduce task is typically written to a 093 * {@link RecordWriter} via 094 * {@link Context#write(Object, Object)}.</p> 095 * </li> 096 * </ol> 097 * 098 * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p> 099 * 100 * <p>Example:</p> 101 * <p><blockquote><pre> 102 * public class IntSumReducer<Key> extends Reducer<Key,IntWritable, 103 * Key,IntWritable> { 104 * private IntWritable result = new IntWritable(); 105 * 106 * public void reduce(Key key, Iterable<IntWritable> values, 107 * Context context) throws IOException, InterruptedException { 108 * int sum = 0; 109 * for (IntWritable val : values) { 110 * sum += val.get(); 111 * } 112 * result.set(sum); 113 * context.write(key, result); 114 * } 115 * } 116 * </pre></blockquote> 117 * 118 * @see Mapper 119 * @see Partitioner 120 */ 121@Checkpointable 122@InterfaceAudience.Public 123@InterfaceStability.Stable 124public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 125 126 /** 127 * The <code>Context</code> passed on to the {@link Reducer} implementations. 128 */ 129 public abstract class Context 130 implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { 131 } 132 133 /** 134 * Called once at the start of the task. 135 */ 136 protected void setup(Context context 137 ) throws IOException, InterruptedException { 138 // NOTHING 139 } 140 141 /** 142 * This method is called once for each key. Most applications will define 143 * their reduce class by overriding this method. The default implementation 144 * is an identity function. 145 */ 146 @SuppressWarnings("unchecked") 147 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 148 ) throws IOException, InterruptedException { 149 for(VALUEIN value: values) { 150 context.write((KEYOUT) key, (VALUEOUT) value); 151 } 152 } 153 154 /** 155 * Called once at the end of the task. 156 */ 157 protected void cleanup(Context context 158 ) throws IOException, InterruptedException { 159 // NOTHING 160 } 161 162 /** 163 * Advanced application writers can use the 164 * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to 165 * control how the reduce task works. 166 */ 167 public void run(Context context) throws IOException, InterruptedException { 168 setup(context); 169 try { 170 while (context.nextKey()) { 171 reduce(context.getCurrentKey(), context.getValues(), context); 172 // If a back up store is used, reset it 173 Iterator<VALUEIN> iter = context.getValues().iterator(); 174 if(iter instanceof ReduceContext.ValueIterator) { 175 ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); 176 } 177 } 178 } finally { 179 cleanup(context); 180 } 181 } 182}