001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.aggregate;
020
021 import java.util.ArrayList;
022 import java.util.Map.Entry;
023
024 import org.apache.hadoop.classification.InterfaceAudience;
025 import org.apache.hadoop.classification.InterfaceStability;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.io.Text;
028 import org.apache.hadoop.mapreduce.MRJobConfig;
029
030 /**
031 * This class implements the common functionalities of
032 * the subclasses of ValueAggregatorDescriptor class.
033 */
034 @InterfaceAudience.Public
035 @InterfaceStability.Stable
036 public class ValueAggregatorBaseDescriptor
037 implements ValueAggregatorDescriptor {
038
039 static public final String UNIQ_VALUE_COUNT = "UniqValueCount";
040
041 static public final String LONG_VALUE_SUM = "LongValueSum";
042
043 static public final String DOUBLE_VALUE_SUM = "DoubleValueSum";
044
045 static public final String VALUE_HISTOGRAM = "ValueHistogram";
046
047 static public final String LONG_VALUE_MAX = "LongValueMax";
048
049 static public final String LONG_VALUE_MIN = "LongValueMin";
050
051 static public final String STRING_VALUE_MAX = "StringValueMax";
052
053 static public final String STRING_VALUE_MIN = "StringValueMin";
054
055 public String inputFile = null;
056
057 private static class MyEntry implements Entry<Text, Text> {
058 Text key;
059
060 Text val;
061
062 public Text getKey() {
063 return key;
064 }
065
066 public Text getValue() {
067 return val;
068 }
069
070 public Text setValue(Text val) {
071 this.val = val;
072 return val;
073 }
074
075 public MyEntry(Text key, Text val) {
076 this.key = key;
077 this.val = val;
078 }
079 }
080
081 /**
082 *
083 * @param type the aggregation type
084 * @param id the aggregation id
085 * @param val the val associated with the id to be aggregated
086 * @return an Entry whose key is the aggregation id prefixed with
087 * the aggregation type.
088 */
089 public static Entry<Text, Text> generateEntry(String type,
090 String id, Text val) {
091 Text key = new Text(type + TYPE_SEPARATOR + id);
092 return new MyEntry(key, val);
093 }
094
095 /**
096 *
097 * @param type the aggregation type
098 * @param uniqCount the limit in the number of unique values to keep,
099 * if type is UNIQ_VALUE_COUNT
100 * @return a value aggregator of the given type.
101 */
102 static public ValueAggregator generateValueAggregator(String type, long uniqCount) {
103 if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) {
104 return new LongValueSum();
105 } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) {
106 return new LongValueMax();
107 } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) {
108 return new LongValueMin();
109 } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) {
110 return new StringValueMax();
111 } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) {
112 return new StringValueMin();
113 } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) {
114 return new DoubleValueSum();
115 } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) {
116 return new UniqValueCount(uniqCount);
117 } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) {
118 return new ValueHistogram();
119 }
120 return null;
121 }
122
123 /**
124 * Generate 1 or 2 aggregation-id/value pairs for the given key/value pair.
125 * The first id will be of type LONG_VALUE_SUM, with "record_count" as
126 * its aggregation id. If the input is a file split,
127 * the second id of the same type will be generated too, with the file name
128 * as its aggregation id. This achieves the behavior of counting the total
129 * number of records in the input data, and the number of records
130 * in each input file.
131 *
132 * @param key
133 * input key
134 * @param val
135 * input value
136 * @return a list of aggregation id/value pairs. An aggregation id encodes an
137 * aggregation type which is used to guide the way to aggregate the
138 * value in the reduce/combiner phrase of an Aggregate based job.
139 */
140 public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
141 Object val) {
142 ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
143 String countType = LONG_VALUE_SUM;
144 String id = "record_count";
145 Entry<Text, Text> e = generateEntry(countType, id, ONE);
146 if (e != null) {
147 retv.add(e);
148 }
149 if (this.inputFile != null) {
150 e = generateEntry(countType, this.inputFile, ONE);
151 if (e != null) {
152 retv.add(e);
153 }
154 }
155 return retv;
156 }
157
158 /**
159 * get the input file name.
160 *
161 * @param conf a configuration object
162 */
163 public void configure(Configuration conf) {
164 this.inputFile = conf.get(MRJobConfig.MAP_INPUT_FILE);
165 }
166 }