001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.util; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.PrintStream; 024import java.io.PrintWriter; 025import java.io.UnsupportedEncodingException; 026import java.lang.management.ManagementFactory; 027import java.lang.management.ThreadInfo; 028import java.lang.management.ThreadMXBean; 029import java.lang.reflect.Constructor; 030import java.lang.reflect.Field; 031import java.lang.reflect.Method; 032import java.nio.charset.Charset; 033import java.util.ArrayList; 034import java.util.List; 035import java.util.Map; 036import java.util.concurrent.ConcurrentHashMap; 037 038import org.apache.commons.logging.Log; 039import org.apache.hadoop.classification.InterfaceAudience; 040import org.apache.hadoop.classification.InterfaceStability; 041import org.apache.hadoop.conf.Configurable; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.io.DataInputBuffer; 044import org.apache.hadoop.io.DataOutputBuffer; 045import org.apache.hadoop.io.Writable; 046import org.apache.hadoop.io.serializer.Deserializer; 047import org.apache.hadoop.io.serializer.SerializationFactory; 048import org.apache.hadoop.io.serializer.Serializer; 049 050/** 051 * General reflection utils 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Evolving 055public class ReflectionUtils { 056 057 private static final Class<?>[] EMPTY_ARRAY = new Class[]{}; 058 volatile private static SerializationFactory serialFactory = null; 059 060 /** 061 * Cache of constructors for each class. Pins the classes so they 062 * can't be garbage collected until ReflectionUtils can be collected. 063 */ 064 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 065 new ConcurrentHashMap<Class<?>, Constructor<?>>(); 066 067 /** 068 * Check and set 'configuration' if necessary. 069 * 070 * @param theObject object for which to set configuration 071 * @param conf Configuration 072 */ 073 public static void setConf(Object theObject, Configuration conf) { 074 if (conf != null) { 075 if (theObject instanceof Configurable) { 076 ((Configurable) theObject).setConf(conf); 077 } 078 setJobConf(theObject, conf); 079 } 080 } 081 082 /** 083 * This code is to support backward compatibility and break the compile 084 * time dependency of core on mapred. 085 * This should be made deprecated along with the mapred package HADOOP-1230. 086 * Should be removed when mapred package is removed. 087 */ 088 private static void setJobConf(Object theObject, Configuration conf) { 089 //If JobConf and JobConfigurable are in classpath, AND 090 //theObject is of type JobConfigurable AND 091 //conf is of type JobConf then 092 //invoke configure on theObject 093 try { 094 Class<?> jobConfClass = 095 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf"); 096 if (jobConfClass == null) { 097 return; 098 } 099 100 Class<?> jobConfigurableClass = 101 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable"); 102 if (jobConfigurableClass == null) { 103 return; 104 } 105 if (jobConfClass.isAssignableFrom(conf.getClass()) && 106 jobConfigurableClass.isAssignableFrom(theObject.getClass())) { 107 Method configureMethod = 108 jobConfigurableClass.getMethod("configure", jobConfClass); 109 configureMethod.invoke(theObject, conf); 110 } 111 } catch (Exception e) { 112 throw new RuntimeException("Error in configuring object", e); 113 } 114 } 115 116 /** Create an object for the given class and initialize it from conf 117 * 118 * @param theClass class of which an object is created 119 * @param conf Configuration 120 * @return a new object 121 */ 122 @SuppressWarnings("unchecked") 123 public static <T> T newInstance(Class<T> theClass, Configuration conf) { 124 T result; 125 try { 126 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); 127 if (meth == null) { 128 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); 129 meth.setAccessible(true); 130 CONSTRUCTOR_CACHE.put(theClass, meth); 131 } 132 result = meth.newInstance(); 133 } catch (Exception e) { 134 throw new RuntimeException(e); 135 } 136 setConf(result, conf); 137 return result; 138 } 139 140 static private ThreadMXBean threadBean = 141 ManagementFactory.getThreadMXBean(); 142 143 public static void setContentionTracing(boolean val) { 144 threadBean.setThreadContentionMonitoringEnabled(val); 145 } 146 147 private static String getTaskName(long id, String name) { 148 if (name == null) { 149 return Long.toString(id); 150 } 151 return id + " (" + name + ")"; 152 } 153 154 /** 155 * Print all of the thread's information and stack traces. 156 * 157 * @param stream the stream to 158 * @param title a string title for the stack trace 159 */ 160 public synchronized static void printThreadInfo(PrintStream stream, 161 String title) { 162 final int STACK_DEPTH = 20; 163 boolean contention = threadBean.isThreadContentionMonitoringEnabled(); 164 long[] threadIds = threadBean.getAllThreadIds(); 165 stream.println("Process Thread Dump: " + title); 166 stream.println(threadIds.length + " active threads"); 167 for (long tid: threadIds) { 168 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH); 169 if (info == null) { 170 stream.println(" Inactive"); 171 continue; 172 } 173 stream.println("Thread " + 174 getTaskName(info.getThreadId(), 175 info.getThreadName()) + ":"); 176 Thread.State state = info.getThreadState(); 177 stream.println(" State: " + state); 178 stream.println(" Blocked count: " + info.getBlockedCount()); 179 stream.println(" Waited count: " + info.getWaitedCount()); 180 if (contention) { 181 stream.println(" Blocked time: " + info.getBlockedTime()); 182 stream.println(" Waited time: " + info.getWaitedTime()); 183 } 184 if (state == Thread.State.WAITING) { 185 stream.println(" Waiting on " + info.getLockName()); 186 } else if (state == Thread.State.BLOCKED) { 187 stream.println(" Blocked on " + info.getLockName()); 188 stream.println(" Blocked by " + 189 getTaskName(info.getLockOwnerId(), 190 info.getLockOwnerName())); 191 } 192 stream.println(" Stack:"); 193 for (StackTraceElement frame: info.getStackTrace()) { 194 stream.println(" " + frame.toString()); 195 } 196 } 197 stream.flush(); 198 } 199 200 private static long previousLogTime = 0; 201 202 /** 203 * Log the current thread stacks at INFO level. 204 * @param log the logger that logs the stack trace 205 * @param title a descriptive title for the call stacks 206 * @param minInterval the minimum time from the last 207 */ 208 public static void logThreadInfo(Log log, 209 String title, 210 long minInterval) { 211 boolean dumpStack = false; 212 if (log.isInfoEnabled()) { 213 synchronized (ReflectionUtils.class) { 214 long now = Time.now(); 215 if (now - previousLogTime >= minInterval * 1000) { 216 previousLogTime = now; 217 dumpStack = true; 218 } 219 } 220 if (dumpStack) { 221 try { 222 ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 223 printThreadInfo(new PrintStream(buffer, false, "UTF-8"), title); 224 log.info(buffer.toString(Charset.defaultCharset().name())); 225 } catch (UnsupportedEncodingException ignored) { 226 } 227 } 228 } 229 } 230 231 /** 232 * Return the correctly-typed {@link Class} of the given object. 233 * 234 * @param o object whose correctly-typed <code>Class</code> is to be obtained 235 * @return the correctly typed <code>Class</code> of the given object. 236 */ 237 @SuppressWarnings("unchecked") 238 public static <T> Class<T> getClass(T o) { 239 return (Class<T>)o.getClass(); 240 } 241 242 // methods to support testing 243 static void clearCache() { 244 CONSTRUCTOR_CACHE.clear(); 245 } 246 247 static int getCacheSize() { 248 return CONSTRUCTOR_CACHE.size(); 249 } 250 /** 251 * A pair of input/output buffers that we use to clone writables. 252 */ 253 private static class CopyInCopyOutBuffer { 254 DataOutputBuffer outBuffer = new DataOutputBuffer(); 255 DataInputBuffer inBuffer = new DataInputBuffer(); 256 /** 257 * Move the data from the output buffer to the input buffer. 258 */ 259 void moveData() { 260 inBuffer.reset(outBuffer.getData(), outBuffer.getLength()); 261 } 262 } 263 264 /** 265 * Allocate a buffer for each thread that tries to clone objects. 266 */ 267 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers 268 = new ThreadLocal<CopyInCopyOutBuffer>() { 269 @Override 270 protected synchronized CopyInCopyOutBuffer initialValue() { 271 return new CopyInCopyOutBuffer(); 272 } 273 }; 274 275 private static SerializationFactory getFactory(Configuration conf) { 276 if (serialFactory == null) { 277 serialFactory = new SerializationFactory(conf); 278 } 279 return serialFactory; 280 } 281 282 /** 283 * Make a copy of the writable object using serialization to a buffer 284 * @param src the object to copy from 285 * @param dst the object to copy into, which is destroyed 286 * @return dst param (the copy) 287 * @throws IOException 288 */ 289 @SuppressWarnings("unchecked") 290 public static <T> T copy(Configuration conf, 291 T src, T dst) throws IOException { 292 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 293 buffer.outBuffer.reset(); 294 SerializationFactory factory = getFactory(conf); 295 Class<T> cls = (Class<T>) src.getClass(); 296 Serializer<T> serializer = factory.getSerializer(cls); 297 serializer.open(buffer.outBuffer); 298 serializer.serialize(src); 299 buffer.moveData(); 300 Deserializer<T> deserializer = factory.getDeserializer(cls); 301 deserializer.open(buffer.inBuffer); 302 dst = deserializer.deserialize(dst); 303 return dst; 304 } 305 306 @Deprecated 307 public static void cloneWritableInto(Writable dst, 308 Writable src) throws IOException { 309 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 310 buffer.outBuffer.reset(); 311 src.write(buffer.outBuffer); 312 buffer.moveData(); 313 dst.readFields(buffer.inBuffer); 314 } 315 316 /** 317 * Gets all the declared fields of a class including fields declared in 318 * superclasses. 319 */ 320 public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) { 321 List<Field> fields = new ArrayList<Field>(); 322 while (clazz != null) { 323 for (Field field : clazz.getDeclaredFields()) { 324 fields.add(field); 325 } 326 clazz = clazz.getSuperclass(); 327 } 328 329 return fields; 330 } 331 332 /** 333 * Gets all the declared methods of a class including methods declared in 334 * superclasses. 335 */ 336 public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) { 337 List<Method> methods = new ArrayList<Method>(); 338 while (clazz != null) { 339 for (Method method : clazz.getDeclaredMethods()) { 340 methods.add(method); 341 } 342 clazz = clazz.getSuperclass(); 343 } 344 345 return methods; 346 } 347}