001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib.aggregate;
020    
021    import java.util.Map.Entry;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.Text;
026    import org.apache.hadoop.mapred.JobConf;
027    
028    /** 
029     * This class implements the common functionalities of 
030     * the subclasses of ValueAggregatorDescriptor class.
031     */
032    @InterfaceAudience.Public
033    @InterfaceStability.Stable
034    public class ValueAggregatorBaseDescriptor extends org.apache.hadoop.mapreduce.
035        lib.aggregate.ValueAggregatorBaseDescriptor 
036        implements ValueAggregatorDescriptor {
037    
038      static public final String UNIQ_VALUE_COUNT = org.apache.hadoop.mapreduce.
039        lib.aggregate.ValueAggregatorBaseDescriptor.UNIQ_VALUE_COUNT;
040    
041      static public final String LONG_VALUE_SUM = org.apache.hadoop.mapreduce.
042        lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_SUM;
043    
044      static public final String DOUBLE_VALUE_SUM = org.apache.hadoop.mapreduce.
045        lib.aggregate.ValueAggregatorBaseDescriptor.DOUBLE_VALUE_SUM;
046    
047      static public final String VALUE_HISTOGRAM = org.apache.hadoop.mapreduce.
048        lib.aggregate.ValueAggregatorBaseDescriptor.VALUE_HISTOGRAM;
049      
050      static public final String LONG_VALUE_MAX = org.apache.hadoop.mapreduce.
051        lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_MAX;
052      
053      static public final String LONG_VALUE_MIN = org.apache.hadoop.mapreduce.
054        lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_MIN;
055      
056      static public final String STRING_VALUE_MAX = org.apache.hadoop.mapreduce.
057        lib.aggregate.ValueAggregatorBaseDescriptor.STRING_VALUE_MAX;
058      
059      static public final String STRING_VALUE_MIN = org.apache.hadoop.mapreduce.
060        lib.aggregate.ValueAggregatorBaseDescriptor.STRING_VALUE_MIN;
061    
062      private static long maxNumItems = Long.MAX_VALUE; 
063      
064     /**
065       * 
066       * @param type the aggregation type
067       * @param id the aggregation id
068       * @param val the val associated with the id to be aggregated
069       * @return an Entry whose key is the aggregation id prefixed with 
070       * the aggregation type.
071       */
072      public static Entry<Text, Text> generateEntry(String type, String id, Text val) {
073        return org.apache.hadoop.mapreduce.lib.aggregate.
074          ValueAggregatorBaseDescriptor.generateEntry(type, id, val);
075      }
076    
077      /**
078       * 
079       * @param type the aggregation type
080       * @return a value aggregator of the given type.
081       */
082      static public ValueAggregator generateValueAggregator(String type) {
083        ValueAggregator retv = null;
084        if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
085          retv = new LongValueSum();
086        } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) {
087          retv = new LongValueMax();
088        } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) {
089          retv = new LongValueMin();
090        } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) {
091          retv = new StringValueMax();
092        } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) {
093          retv = new StringValueMin();
094        } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
095          retv = new DoubleValueSum();
096        } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
097          retv = new UniqValueCount(maxNumItems);
098        } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
099          retv = new ValueHistogram();
100        }
101        return retv;
102      }
103    
104      /**
105       * get the input file name.
106       * 
107       * @param job a job configuration object
108       */
109      public void configure(JobConf job) {
110        super.configure(job);
111        maxNumItems = job.getLong("aggregate.max.num.unique.values",
112                                  Long.MAX_VALUE);
113      }
114    }