001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.aggregate; 020 021import java.io.IOException; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.io.Text; 026import org.apache.hadoop.io.Writable; 027import org.apache.hadoop.io.WritableComparable; 028import org.apache.hadoop.mapreduce.Reducer; 029 030/** 031 * This class implements the generic reducer of Aggregate. 032 */ 033@InterfaceAudience.Public 034@InterfaceStability.Stable 035public class ValueAggregatorReducer<K1 extends WritableComparable<?>, 036 V1 extends Writable> 037 extends Reducer<Text, Text, Text, Text> { 038 039 public void setup(Context context) 040 throws IOException, InterruptedException { 041 ValueAggregatorJobBase.setup(context.getConfiguration()); 042 } 043 044 /** 045 * @param key 046 * the key is expected to be a Text object, whose prefix indicates 047 * the type of aggregation to aggregate the values. In effect, data 048 * driven computing is achieved. It is assumed that each aggregator's 049 * getReport method emits appropriate output for the aggregator. This 050 * may be further customized. 051 * @param values the values to be aggregated 052 * @param context 053 */ 054 public void reduce(Text key, Iterable<Text> values, 055 Context context) throws IOException, InterruptedException { 056 String keyStr = key.toString(); 057 int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR); 058 String type = keyStr.substring(0, pos); 059 keyStr = keyStr.substring(pos + 060 ValueAggregatorDescriptor.TYPE_SEPARATOR.length()); 061 long uniqCount = context.getConfiguration(). 062 getLong(UniqValueCount.MAX_NUM_UNIQUE_VALUES, Long.MAX_VALUE); 063 ValueAggregator aggregator = ValueAggregatorBaseDescriptor 064 .generateValueAggregator(type, uniqCount); 065 for (Text value : values) { 066 aggregator.addNextValue(value); 067 } 068 069 String val = aggregator.getReport(); 070 key = new Text(keyStr); 071 context.write(key, new Text(val)); 072 } 073}