001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.util; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.PrintWriter; 024import java.lang.management.ManagementFactory; 025import java.lang.management.ThreadInfo; 026import java.lang.management.ThreadMXBean; 027import java.lang.reflect.Constructor; 028import java.lang.reflect.Field; 029import java.lang.reflect.Method; 030import java.util.ArrayList; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.ConcurrentHashMap; 034 035import org.apache.commons.logging.Log; 036import org.apache.hadoop.classification.InterfaceAudience; 037import org.apache.hadoop.classification.InterfaceStability; 038import org.apache.hadoop.conf.Configurable; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.io.DataInputBuffer; 041import org.apache.hadoop.io.DataOutputBuffer; 042import org.apache.hadoop.io.Writable; 043import org.apache.hadoop.io.serializer.Deserializer; 044import org.apache.hadoop.io.serializer.SerializationFactory; 045import org.apache.hadoop.io.serializer.Serializer; 046 047/** 048 * General reflection utils 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Evolving 052public class ReflectionUtils { 053 054 private static final Class<?>[] EMPTY_ARRAY = new Class[]{}; 055 volatile private static SerializationFactory serialFactory = null; 056 057 /** 058 * Cache of constructors for each class. Pins the classes so they 059 * can't be garbage collected until ReflectionUtils can be collected. 060 */ 061 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 062 new ConcurrentHashMap<Class<?>, Constructor<?>>(); 063 064 /** 065 * Check and set 'configuration' if necessary. 066 * 067 * @param theObject object for which to set configuration 068 * @param conf Configuration 069 */ 070 public static void setConf(Object theObject, Configuration conf) { 071 if (conf != null) { 072 if (theObject instanceof Configurable) { 073 ((Configurable) theObject).setConf(conf); 074 } 075 setJobConf(theObject, conf); 076 } 077 } 078 079 /** 080 * This code is to support backward compatibility and break the compile 081 * time dependency of core on mapred. 082 * This should be made deprecated along with the mapred package HADOOP-1230. 083 * Should be removed when mapred package is removed. 084 */ 085 private static void setJobConf(Object theObject, Configuration conf) { 086 //If JobConf and JobConfigurable are in classpath, AND 087 //theObject is of type JobConfigurable AND 088 //conf is of type JobConf then 089 //invoke configure on theObject 090 try { 091 Class<?> jobConfClass = 092 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf"); 093 if (jobConfClass == null) { 094 return; 095 } 096 097 Class<?> jobConfigurableClass = 098 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable"); 099 if (jobConfigurableClass == null) { 100 return; 101 } 102 if (jobConfClass.isAssignableFrom(conf.getClass()) && 103 jobConfigurableClass.isAssignableFrom(theObject.getClass())) { 104 Method configureMethod = 105 jobConfigurableClass.getMethod("configure", jobConfClass); 106 configureMethod.invoke(theObject, conf); 107 } 108 } catch (Exception e) { 109 throw new RuntimeException("Error in configuring object", e); 110 } 111 } 112 113 /** Create an object for the given class and initialize it from conf 114 * 115 * @param theClass class of which an object is created 116 * @param conf Configuration 117 * @return a new object 118 */ 119 @SuppressWarnings("unchecked") 120 public static <T> T newInstance(Class<T> theClass, Configuration conf) { 121 T result; 122 try { 123 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); 124 if (meth == null) { 125 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); 126 meth.setAccessible(true); 127 CONSTRUCTOR_CACHE.put(theClass, meth); 128 } 129 result = meth.newInstance(); 130 } catch (Exception e) { 131 throw new RuntimeException(e); 132 } 133 setConf(result, conf); 134 return result; 135 } 136 137 static private ThreadMXBean threadBean = 138 ManagementFactory.getThreadMXBean(); 139 140 public static void setContentionTracing(boolean val) { 141 threadBean.setThreadContentionMonitoringEnabled(val); 142 } 143 144 private static String getTaskName(long id, String name) { 145 if (name == null) { 146 return Long.toString(id); 147 } 148 return id + " (" + name + ")"; 149 } 150 151 /** 152 * Print all of the thread's information and stack traces. 153 * 154 * @param stream the stream to 155 * @param title a string title for the stack trace 156 */ 157 public synchronized static void printThreadInfo(PrintWriter stream, 158 String title) { 159 final int STACK_DEPTH = 20; 160 boolean contention = threadBean.isThreadContentionMonitoringEnabled(); 161 long[] threadIds = threadBean.getAllThreadIds(); 162 stream.println("Process Thread Dump: " + title); 163 stream.println(threadIds.length + " active threads"); 164 for (long tid: threadIds) { 165 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH); 166 if (info == null) { 167 stream.println(" Inactive"); 168 continue; 169 } 170 stream.println("Thread " + 171 getTaskName(info.getThreadId(), 172 info.getThreadName()) + ":"); 173 Thread.State state = info.getThreadState(); 174 stream.println(" State: " + state); 175 stream.println(" Blocked count: " + info.getBlockedCount()); 176 stream.println(" Waited count: " + info.getWaitedCount()); 177 if (contention) { 178 stream.println(" Blocked time: " + info.getBlockedTime()); 179 stream.println(" Waited time: " + info.getWaitedTime()); 180 } 181 if (state == Thread.State.WAITING) { 182 stream.println(" Waiting on " + info.getLockName()); 183 } else if (state == Thread.State.BLOCKED) { 184 stream.println(" Blocked on " + info.getLockName()); 185 stream.println(" Blocked by " + 186 getTaskName(info.getLockOwnerId(), 187 info.getLockOwnerName())); 188 } 189 stream.println(" Stack:"); 190 for (StackTraceElement frame: info.getStackTrace()) { 191 stream.println(" " + frame.toString()); 192 } 193 } 194 stream.flush(); 195 } 196 197 private static long previousLogTime = 0; 198 199 /** 200 * Log the current thread stacks at INFO level. 201 * @param log the logger that logs the stack trace 202 * @param title a descriptive title for the call stacks 203 * @param minInterval the minimum time from the last 204 */ 205 public static void logThreadInfo(Log log, 206 String title, 207 long minInterval) { 208 boolean dumpStack = false; 209 if (log.isInfoEnabled()) { 210 synchronized (ReflectionUtils.class) { 211 long now = Time.now(); 212 if (now - previousLogTime >= minInterval * 1000) { 213 previousLogTime = now; 214 dumpStack = true; 215 } 216 } 217 if (dumpStack) { 218 ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 219 printThreadInfo(new PrintWriter(buffer), title); 220 log.info(buffer.toString()); 221 } 222 } 223 } 224 225 /** 226 * Return the correctly-typed {@link Class} of the given object. 227 * 228 * @param o object whose correctly-typed <code>Class</code> is to be obtained 229 * @return the correctly typed <code>Class</code> of the given object. 230 */ 231 @SuppressWarnings("unchecked") 232 public static <T> Class<T> getClass(T o) { 233 return (Class<T>)o.getClass(); 234 } 235 236 // methods to support testing 237 static void clearCache() { 238 CONSTRUCTOR_CACHE.clear(); 239 } 240 241 static int getCacheSize() { 242 return CONSTRUCTOR_CACHE.size(); 243 } 244 /** 245 * A pair of input/output buffers that we use to clone writables. 246 */ 247 private static class CopyInCopyOutBuffer { 248 DataOutputBuffer outBuffer = new DataOutputBuffer(); 249 DataInputBuffer inBuffer = new DataInputBuffer(); 250 /** 251 * Move the data from the output buffer to the input buffer. 252 */ 253 void moveData() { 254 inBuffer.reset(outBuffer.getData(), outBuffer.getLength()); 255 } 256 } 257 258 /** 259 * Allocate a buffer for each thread that tries to clone objects. 260 */ 261 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers 262 = new ThreadLocal<CopyInCopyOutBuffer>() { 263 @Override 264 protected synchronized CopyInCopyOutBuffer initialValue() { 265 return new CopyInCopyOutBuffer(); 266 } 267 }; 268 269 private static SerializationFactory getFactory(Configuration conf) { 270 if (serialFactory == null) { 271 serialFactory = new SerializationFactory(conf); 272 } 273 return serialFactory; 274 } 275 276 /** 277 * Make a copy of the writable object using serialization to a buffer 278 * @param src the object to copy from 279 * @param dst the object to copy into, which is destroyed 280 * @return dst param (the copy) 281 * @throws IOException 282 */ 283 @SuppressWarnings("unchecked") 284 public static <T> T copy(Configuration conf, 285 T src, T dst) throws IOException { 286 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 287 buffer.outBuffer.reset(); 288 SerializationFactory factory = getFactory(conf); 289 Class<T> cls = (Class<T>) src.getClass(); 290 Serializer<T> serializer = factory.getSerializer(cls); 291 serializer.open(buffer.outBuffer); 292 serializer.serialize(src); 293 buffer.moveData(); 294 Deserializer<T> deserializer = factory.getDeserializer(cls); 295 deserializer.open(buffer.inBuffer); 296 dst = deserializer.deserialize(dst); 297 return dst; 298 } 299 300 @Deprecated 301 public static void cloneWritableInto(Writable dst, 302 Writable src) throws IOException { 303 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 304 buffer.outBuffer.reset(); 305 src.write(buffer.outBuffer); 306 buffer.moveData(); 307 dst.readFields(buffer.inBuffer); 308 } 309 310 /** 311 * Gets all the declared fields of a class including fields declared in 312 * superclasses. 313 */ 314 public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) { 315 List<Field> fields = new ArrayList<Field>(); 316 while (clazz != null) { 317 for (Field field : clazz.getDeclaredFields()) { 318 fields.add(field); 319 } 320 clazz = clazz.getSuperclass(); 321 } 322 323 return fields; 324 } 325 326 /** 327 * Gets all the declared methods of a class including methods declared in 328 * superclasses. 329 */ 330 public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) { 331 List<Method> methods = new ArrayList<Method>(); 332 while (clazz != null) { 333 for (Method method : clazz.getDeclaredMethods()) { 334 methods.add(method); 335 } 336 clazz = clazz.getSuperclass(); 337 } 338 339 return methods; 340 } 341}