001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib.aggregate; 020 021 import java.util.Map.Entry; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.io.Text; 026 import org.apache.hadoop.mapred.JobConf; 027 028 /** 029 * This class implements the common functionalities of 030 * the subclasses of ValueAggregatorDescriptor class. 031 */ 032 @InterfaceAudience.Public 033 @InterfaceStability.Stable 034 public class ValueAggregatorBaseDescriptor extends org.apache.hadoop.mapreduce. 035 lib.aggregate.ValueAggregatorBaseDescriptor 036 implements ValueAggregatorDescriptor { 037 038 static public final String UNIQ_VALUE_COUNT = org.apache.hadoop.mapreduce. 039 lib.aggregate.ValueAggregatorBaseDescriptor.UNIQ_VALUE_COUNT; 040 041 static public final String LONG_VALUE_SUM = org.apache.hadoop.mapreduce. 042 lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_SUM; 043 044 static public final String DOUBLE_VALUE_SUM = org.apache.hadoop.mapreduce. 045 lib.aggregate.ValueAggregatorBaseDescriptor.DOUBLE_VALUE_SUM; 046 047 static public final String VALUE_HISTOGRAM = org.apache.hadoop.mapreduce. 048 lib.aggregate.ValueAggregatorBaseDescriptor.VALUE_HISTOGRAM; 049 050 static public final String LONG_VALUE_MAX = org.apache.hadoop.mapreduce. 051 lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_MAX; 052 053 static public final String LONG_VALUE_MIN = org.apache.hadoop.mapreduce. 054 lib.aggregate.ValueAggregatorBaseDescriptor.LONG_VALUE_MIN; 055 056 static public final String STRING_VALUE_MAX = org.apache.hadoop.mapreduce. 057 lib.aggregate.ValueAggregatorBaseDescriptor.STRING_VALUE_MAX; 058 059 static public final String STRING_VALUE_MIN = org.apache.hadoop.mapreduce. 060 lib.aggregate.ValueAggregatorBaseDescriptor.STRING_VALUE_MIN; 061 062 private static long maxNumItems = Long.MAX_VALUE; 063 064 /** 065 * 066 * @param type the aggregation type 067 * @param id the aggregation id 068 * @param val the val associated with the id to be aggregated 069 * @return an Entry whose key is the aggregation id prefixed with 070 * the aggregation type. 071 */ 072 public static Entry<Text, Text> generateEntry(String type, String id, Text val) { 073 return org.apache.hadoop.mapreduce.lib.aggregate. 074 ValueAggregatorBaseDescriptor.generateEntry(type, id, val); 075 } 076 077 /** 078 * 079 * @param type the aggregation type 080 * @return a value aggregator of the given type. 081 */ 082 static public ValueAggregator generateValueAggregator(String type) { 083 ValueAggregator retv = null; 084 if (type.compareToIgnoreCase(LONG_VALUE_SUM) == 0) { 085 retv = new LongValueSum(); 086 } if (type.compareToIgnoreCase(LONG_VALUE_MAX) == 0) { 087 retv = new LongValueMax(); 088 } else if (type.compareToIgnoreCase(LONG_VALUE_MIN) == 0) { 089 retv = new LongValueMin(); 090 } else if (type.compareToIgnoreCase(STRING_VALUE_MAX) == 0) { 091 retv = new StringValueMax(); 092 } else if (type.compareToIgnoreCase(STRING_VALUE_MIN) == 0) { 093 retv = new StringValueMin(); 094 } else if (type.compareToIgnoreCase(DOUBLE_VALUE_SUM) == 0) { 095 retv = new DoubleValueSum(); 096 } else if (type.compareToIgnoreCase(UNIQ_VALUE_COUNT) == 0) { 097 retv = new UniqValueCount(maxNumItems); 098 } else if (type.compareToIgnoreCase(VALUE_HISTOGRAM) == 0) { 099 retv = new ValueHistogram(); 100 } 101 return retv; 102 } 103 104 /** 105 * get the input file name. 106 * 107 * @param job a job configuration object 108 */ 109 public void configure(JobConf job) { 110 super.configure(job); 111 maxNumItems = job.getLong("aggregate.max.num.unique.values", 112 Long.MAX_VALUE); 113 } 114 }