001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.io.IOException;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.Text;
026    import org.apache.hadoop.io.Writable;
027    import org.apache.hadoop.io.WritableComparable;
028    import org.apache.hadoop.mapreduce.Reducer;
029    
030    /**
031     * This class implements the generic reducer of Aggregate.
032     */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class ValueAggregatorReducer<K1 extends WritableComparable<?>,
036                                        V1 extends Writable>
037      extends Reducer<Text, Text, Text, Text> {
038    
039      public void setup(Context context) 
040          throws IOException, InterruptedException {
041        ValueAggregatorJobBase.setup(context.getConfiguration());
042      }
043    
044      /**
045       * @param key
046       *        the key is expected to be a Text object, whose prefix indicates
047       *        the type of aggregation to aggregate the values. In effect, data
048       *        driven computing is achieved. It is assumed that each aggregator's
049       *        getReport method emits appropriate output for the aggregator. This
050       *        may be further customized.
051       * @param values the values to be aggregated
052       * @param context 
053       */
054      public void reduce(Text key, Iterable<Text> values,
055          Context context) throws IOException, InterruptedException {
056        String keyStr = key.toString();
057        int pos = keyStr.indexOf(ValueAggregatorDescriptor.TYPE_SEPARATOR);
058        String type = keyStr.substring(0, pos);
059        keyStr = keyStr.substring(pos + 
060                   ValueAggregatorDescriptor.TYPE_SEPARATOR.length());
061        long uniqCount = context.getConfiguration().
062          getLong(UniqValueCount.MAX_NUM_UNIQUE_VALUES, Long.MAX_VALUE);
063        ValueAggregator aggregator = ValueAggregatorBaseDescriptor
064          .generateValueAggregator(type, uniqCount);
065        for (Text value : values) {
066          aggregator.addNextValue(value);
067        }
068    
069        String val = aggregator.getReport();
070        key = new Text(keyStr);
071        context.write(key, new Text(val));
072      }
073    }