001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.aggregate;
020
021 import java.util.ArrayList;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.io.Writable;
027 import org.apache.hadoop.io.WritableComparable;
028
029 /**
030 * This abstract class implements some common functionalities of the
031 * the generic mapper, reducer and combiner classes of Aggregate.
032 */
033 @InterfaceAudience.Public
034 @InterfaceStability.Stable
035 public class ValueAggregatorJobBase<K1 extends WritableComparable<?>,
036 V1 extends Writable>
037 {
038 public static final String DESCRIPTOR = "mapreduce.aggregate.descriptor";
039 public static final String DESCRIPTOR_NUM =
040 "mapreduce.aggregate.descriptor.num";
041 public static final String USER_JAR = "mapreduce.aggregate.user.jar.file";
042
043 protected static ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
044
045 public static void setup(Configuration job) {
046 initializeMySpec(job);
047 logSpec();
048 }
049
050 protected static ValueAggregatorDescriptor getValueAggregatorDescriptor(
051 String spec, Configuration conf) {
052 if (spec == null)
053 return null;
054 String[] segments = spec.split(",", -1);
055 String type = segments[0];
056 if (type.compareToIgnoreCase("UserDefined") == 0) {
057 String className = segments[1];
058 return new UserDefinedValueAggregatorDescriptor(className, conf);
059 }
060 return null;
061 }
062
063 protected static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(
064 Configuration conf) {
065 int num = conf.getInt(DESCRIPTOR_NUM, 0);
066 ArrayList<ValueAggregatorDescriptor> retv =
067 new ArrayList<ValueAggregatorDescriptor>(num);
068 for (int i = 0; i < num; i++) {
069 String spec = conf.get(DESCRIPTOR + "." + i);
070 ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, conf);
071 if (ad != null) {
072 retv.add(ad);
073 }
074 }
075 return retv;
076 }
077
078 private static void initializeMySpec(Configuration conf) {
079 aggregatorDescriptorList = getAggregatorDescriptors(conf);
080 if (aggregatorDescriptorList.size() == 0) {
081 aggregatorDescriptorList
082 .add(new UserDefinedValueAggregatorDescriptor(
083 ValueAggregatorBaseDescriptor.class.getCanonicalName(), conf));
084 }
085 }
086
087 protected static void logSpec() {
088 }
089 }