001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.util.ArrayList;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.Writable;
027    import org.apache.hadoop.io.WritableComparable;
028    
029    /**
030     * This abstract class implements some common functionalities of the
031     * the generic mapper, reducer and combiner classes of Aggregate.
032     */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class ValueAggregatorJobBase<K1 extends WritableComparable<?>,
036                                                 V1 extends Writable>
037    {
038      public static final String DESCRIPTOR = "mapreduce.aggregate.descriptor";
039      public static final String DESCRIPTOR_NUM = 
040        "mapreduce.aggregate.descriptor.num";
041      public static final String USER_JAR = "mapreduce.aggregate.user.jar.file";
042      
043      protected static ArrayList<ValueAggregatorDescriptor> aggregatorDescriptorList = null;
044    
045      public static void setup(Configuration job) {
046        initializeMySpec(job);
047        logSpec();
048      }
049    
050      protected static ValueAggregatorDescriptor getValueAggregatorDescriptor(
051          String spec, Configuration conf) {
052        if (spec == null)
053          return null;
054        String[] segments = spec.split(",", -1);
055        String type = segments[0];
056        if (type.compareToIgnoreCase("UserDefined") == 0) {
057          String className = segments[1];
058          return new UserDefinedValueAggregatorDescriptor(className, conf);
059        }
060        return null;
061      }
062    
063      protected static ArrayList<ValueAggregatorDescriptor> getAggregatorDescriptors(
064          Configuration conf) {
065        int num = conf.getInt(DESCRIPTOR_NUM, 0);
066        ArrayList<ValueAggregatorDescriptor> retv = 
067          new ArrayList<ValueAggregatorDescriptor>(num);
068        for (int i = 0; i < num; i++) {
069          String spec = conf.get(DESCRIPTOR + "." + i);
070          ValueAggregatorDescriptor ad = getValueAggregatorDescriptor(spec, conf);
071          if (ad != null) {
072            retv.add(ad);
073          }
074        }
075        return retv;
076      }
077    
078      private static void initializeMySpec(Configuration conf) {
079        aggregatorDescriptorList = getAggregatorDescriptors(conf);
080        if (aggregatorDescriptorList.size() == 0) {
081          aggregatorDescriptorList
082              .add(new UserDefinedValueAggregatorDescriptor(
083                  ValueAggregatorBaseDescriptor.class.getCanonicalName(), conf));
084        }
085      }
086    
087      protected static void logSpec() {
088      }
089    }