001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.util.ArrayList;
022    import java.util.Map.Entry;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.io.Text;
028    import org.apache.hadoop.mapreduce.MRJobConfig;
029    
030    /** 
031     * This class implements the common functionalities of 
032     * the subclasses of ValueAggregatorDescriptor class.
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Stable
036    public class ValueAggregatorBaseDescriptor 
037        implements ValueAggregatorDescriptor {
038    
039      static public final String UNIQ_VALUE_COUNT = "UniqValueCount";
040    
041      static public final String LONG_VALUE_SUM = "LongValueSum";
042    
043      static public final String DOUBLE_VALUE_SUM = "DoubleValueSum";
044    
045      static public final String VALUE_HISTOGRAM = "ValueHistogram";
046      
047      static public final String LONG_VALUE_MAX = "LongValueMax";
048      
049      static public final String LONG_VALUE_MIN = "LongValueMin";
050      
051      static public final String STRING_VALUE_MAX = "StringValueMax";
052      
053      static public final String STRING_VALUE_MIN = "StringValueMin";
054      
055      public String inputFile = null;
056    
057      private static class MyEntry implements Entry<Text, Text> {
058        Text key;
059    
060        Text val;
061    
062        public Text getKey() {
063          return key;
064        }
065    
066        public Text getValue() {
067          return val;
068        }
069    
070        public Text setValue(Text val) {
071          this.val = val;
072          return val;
073        }
074    
075        public MyEntry(Text key, Text val) {
076          this.key = key;
077          this.val = val;
078        }
079      }
080    
081      /**
082       * 
083       * @param type the aggregation type
084       * @param id the aggregation id
085       * @param val the val associated with the id to be aggregated
086       * @return an Entry whose key is the aggregation id prefixed with 
087       * the aggregation type.
088       */
089      public static Entry<Text, Text> generateEntry(String type, 
090          String id, Text val) {
091        Text key = new Text(type + TYPE_SEPARATOR + id);
092        return new MyEntry(key, val);
093      }
094    
095      /**
096       * 
097       * @param type the aggregation type
098       * @param uniqCount the limit in the number of unique values to keep, 
099       *                  if type is UNIQ_VALUE_COUNT 
100       * @return a value aggregator of the given type.
101       */
102      static public ValueAggregator generateValueAggregator(String type, long uniqCount) {
103        if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
104          return new LongValueSum();
105        } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) {
106          return new LongValueMax();
107        } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) {
108          return new LongValueMin();
109        } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) {
110          return new StringValueMax();
111        } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) {
112          return new StringValueMin();
113        } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
114          return new DoubleValueSum();
115        } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
116          return new UniqValueCount(uniqCount);
117        } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
118          return new ValueHistogram();
119        }
120        return null;
121      }
122    
123      /**
124       * Generate 1 or 2 aggregation-id/value pairs for the given key/value pair.
125       * The first id will be of type LONG_VALUE_SUM, with "record_count" as
126       * its aggregation id. If the input is a file split,
127       * the second id of the same type will be generated too, with the file name 
128       * as its aggregation id. This achieves the behavior of counting the total 
129       * number of records in the input data, and the number of records 
130       * in each input file.
131       * 
132       * @param key
133       *          input key
134       * @param val
135       *          input value
136       * @return a list of aggregation id/value pairs. An aggregation id encodes an
137       *         aggregation type which is used to guide the way to aggregate the
138       *         value in the reduce/combiner phrase of an Aggregate based job.
139       */
140      public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
141                                                              Object val) {
142        ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
143        String countType = LONG_VALUE_SUM;
144        String id = "record_count";
145        Entry<Text, Text> e = generateEntry(countType, id, ONE);
146        if (e != null) {
147          retv.add(e);
148        }
149        if (this.inputFile != null) {
150          e = generateEntry(countType, this.inputFile, ONE);
151          if (e != null) {
152            retv.add(e);
153          }
154        }
155        return retv;
156      }
157    
158      /**
159       * get the input file name.
160       * 
161       * @param conf a configuration object
162       */
163      public void configure(Configuration conf) {
164        this.inputFile = conf.get(MRJobConfig.MAP_INPUT_FILE);
165      }
166    }