001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.aggregate;
020
021 import java.lang.reflect.Constructor;
022 import java.util.ArrayList;
023 import java.util.Map.Entry;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configuration;
028 import org.apache.hadoop.io.Text;
029
030 /**
031 * This class implements a wrapper for a user defined value
032 * aggregator descriptor.
033 * It serves two functions: One is to create an object of
034 * ValueAggregatorDescriptor from the name of a user defined class
035 * that may be dynamically loaded. The other is to
036 * delegate invocations of generateKeyValPairs function to the created object.
037 *
038 */
039 @InterfaceAudience.Public
040 @InterfaceStability.Stable
041 public class UserDefinedValueAggregatorDescriptor implements
042 ValueAggregatorDescriptor {
043 private String className;
044
045 protected ValueAggregatorDescriptor theAggregatorDescriptor = null;
046
047 private static final Class<?>[] argArray = new Class[] {};
048
049 /**
050 * Create an instance of the given class
051 * @param className the name of the class
052 * @return a dynamically created instance of the given class
053 */
054 public static Object createInstance(String className) {
055 Object retv = null;
056 try {
057 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
058 Class<?> theFilterClass = Class.forName(className, true, classLoader);
059 Constructor<?> meth = theFilterClass.getDeclaredConstructor(argArray);
060 meth.setAccessible(true);
061 retv = meth.newInstance();
062 } catch (Exception e) {
063 throw new RuntimeException(e);
064 }
065 return retv;
066 }
067
068 private void createAggregator(Configuration conf) {
069 if (theAggregatorDescriptor == null) {
070 theAggregatorDescriptor = (ValueAggregatorDescriptor)
071 createInstance(this.className);
072 theAggregatorDescriptor.configure(conf);
073 }
074 }
075
076 /**
077 *
078 * @param className the class name of the user defined descriptor class
079 * @param conf a configure object used for decriptor configuration
080 */
081 public UserDefinedValueAggregatorDescriptor(String className,
082 Configuration conf) {
083 this.className = className;
084 this.createAggregator(conf);
085 }
086
087 /**
088 * Generate a list of aggregation-id/value pairs for the given
089 * key/value pairs by delegating the invocation to the real object.
090 *
091 * @param key
092 * input key
093 * @param val
094 * input value
095 * @return a list of aggregation id/value pairs. An aggregation id encodes an
096 * aggregation type which is used to guide the way to aggregate the
097 * value in the reduce/combiner phrase of an Aggregate based job.
098 */
099 public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key,
100 Object val) {
101 ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>();
102 if (this.theAggregatorDescriptor != null) {
103 retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val);
104 }
105 return retv;
106 }
107
108 /**
109 * @return the string representation of this object.
110 */
111 public String toString() {
112 return "UserDefinedValueAggregatorDescriptor with class name:" + "\t"
113 + this.className;
114 }
115
116 /**
117 * Do nothing.
118 */
119 public void configure(Configuration conf) {
120
121 }
122
123 }