001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.aggregate; 020 021 import java.lang.reflect.Constructor; 022 import java.util.ArrayList; 023 import java.util.Map.Entry; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.conf.Configuration; 028 import org.apache.hadoop.io.Text; 029 030 /** 031 * This class implements a wrapper for a user defined value 032 * aggregator descriptor. 033 * It serves two functions: One is to create an object of 034 * ValueAggregatorDescriptor from the name of a user defined class 035 * that may be dynamically loaded. The other is to 036 * delegate invocations of generateKeyValPairs function to the created object. 037 * 038 */ 039 @InterfaceAudience.Public 040 @InterfaceStability.Stable 041 public class UserDefinedValueAggregatorDescriptor implements 042 ValueAggregatorDescriptor { 043 private String className; 044 045 protected ValueAggregatorDescriptor theAggregatorDescriptor = null; 046 047 private static final Class<?>[] argArray = new Class[] {}; 048 049 /** 050 * Create an instance of the given class 051 * @param className the name of the class 052 * @return a dynamically created instance of the given class 053 */ 054 public static Object createInstance(String className) { 055 Object retv = null; 056 try { 057 ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); 058 Class<?> theFilterClass = Class.forName(className, true, classLoader); 059 Constructor<?> meth = theFilterClass.getDeclaredConstructor(argArray); 060 meth.setAccessible(true); 061 retv = meth.newInstance(); 062 } catch (Exception e) { 063 throw new RuntimeException(e); 064 } 065 return retv; 066 } 067 068 private void createAggregator(Configuration conf) { 069 if (theAggregatorDescriptor == null) { 070 theAggregatorDescriptor = (ValueAggregatorDescriptor) 071 createInstance(this.className); 072 theAggregatorDescriptor.configure(conf); 073 } 074 } 075 076 /** 077 * 078 * @param className the class name of the user defined descriptor class 079 * @param conf a configure object used for decriptor configuration 080 */ 081 public UserDefinedValueAggregatorDescriptor(String className, 082 Configuration conf) { 083 this.className = className; 084 this.createAggregator(conf); 085 } 086 087 /** 088 * Generate a list of aggregation-id/value pairs for the given 089 * key/value pairs by delegating the invocation to the real object. 090 * 091 * @param key 092 * input key 093 * @param val 094 * input value 095 * @return a list of aggregation id/value pairs. An aggregation id encodes an 096 * aggregation type which is used to guide the way to aggregate the 097 * value in the reduce/combiner phrase of an Aggregate based job. 098 */ 099 public ArrayList<Entry<Text, Text>> generateKeyValPairs(Object key, 100 Object val) { 101 ArrayList<Entry<Text, Text>> retv = new ArrayList<Entry<Text, Text>>(); 102 if (this.theAggregatorDescriptor != null) { 103 retv = this.theAggregatorDescriptor.generateKeyValPairs(key, val); 104 } 105 return retv; 106 } 107 108 /** 109 * @return the string representation of this object. 110 */ 111 public String toString() { 112 return "UserDefinedValueAggregatorDescriptor with class name:" + "\t" 113 + this.className; 114 } 115 116 /** 117 * Do nothing. 118 */ 119 public void configure(Configuration conf) { 120 121 } 122 123 }