001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.lang.reflect.Constructor;
022    import java.util.ArrayList;
023    import java.util.Map.Entry;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configuration;
028    import org.apache.hadoop.io.Text;
029    
030    /**
031     * This class implements a wrapper for a user defined value 
032     * aggregator descriptor.
033     * It serves two functions: One is to create an object of 
034     * ValueAggregatorDescriptor from the name of a user defined class
035     * that may be dynamically loaded. The other is to
036     * delegate invocations of generateKeyValPairs function to the created object.
037     * 
038     */
039    @InterfaceAudience.Public
040    @InterfaceStability.Stable
041    public class UserDefinedValueAggregatorDescriptor implements
042        ValueAggregatorDescriptor {
043      private String className;
044    
045      protected ValueAggregatorDescriptor theAggregatorDescriptor = null;
046    
047      private static final Class<?>[] argArray = new Class[] {};
048    
049      /**
050       * Create an instance of the given class
051       * @param className the name of the class
052       * @return a dynamically created instance of the given class 
053       */
054      public static Object createInstance(String className) {
055        Object retv = null;
056        try {
057          ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
058          Class<?> theFilterClass = Class.forName(className, true, classLoader);
059          Constructor<?> meth = theFilterClass.getDeclaredConstructor(argArray);
060          meth.setAccessible(true);
061          retv = meth.newInstance();
062        } catch (Exception e) {
063          throw new RuntimeException(e);
064        }
065        return retv;
066      }
067    
068      private void createAggregator(Configuration conf) {
069        if (theAggregatorDescriptor == null) {
070          theAggregatorDescriptor = (ValueAggregatorDescriptor)
071                                      createInstance(this.className);
072          theAggregatorDescriptor.configure(conf);
073        }
074      }
075    
076      /**
077       * 
078       * @param className the class name of the user defined descriptor class
079       * @param conf a configure object used for decriptor configuration
080       */
081      public UserDefinedValueAggregatorDescriptor(String className, 
082          Configuration conf) {
083        this.className = className;
084        this.createAggregator(conf);
085      }
086    
087      /**
088       *   Generate a list of aggregation-id/value pairs for the given 
089       *   key/value pairs by delegating the invocation to the real object.
090       *   
091       * @param key
092       *          input key
093       * @param val
094       *          input value
095       * @return a list of aggregation id/value pairs. An aggregation id encodes an
096       *         aggregation type which is used to guide the way to aggregate the
097       *         value in the reduce/combiner phrase of an Aggregate based job.
098       */
099      public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
100                                                              Object val) {
101        ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
102        if (this.theAggregatorDescriptor != null) {
103          retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
104        }
105        return retv;
106      }
107    
108      /**
109       * @return the string representation of this object.
110       */
111      public String toString() {
112        return "UserDefinedValueAggregatorDescriptor with class name:" + "\t"
113          + this.className;
114      }
115    
116      /**
117       *  Do nothing.
118       */
119      public void configure(Configuration conf) {
120    
121      }
122    
123    }