001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.util; 020 021import java.io.ByteArrayOutputStream; 022import java.io.IOException; 023import java.io.PrintWriter; 024import java.lang.management.ManagementFactory; 025import java.lang.management.ThreadInfo; 026import java.lang.management.ThreadMXBean; 027import java.lang.reflect.Constructor; 028import java.lang.reflect.Method; 029import java.util.Map; 030import java.util.concurrent.ConcurrentHashMap; 031 032import org.apache.commons.logging.Log; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.conf.Configurable; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.io.DataInputBuffer; 038import org.apache.hadoop.io.DataOutputBuffer; 039import org.apache.hadoop.io.Writable; 040import org.apache.hadoop.io.serializer.Deserializer; 041import org.apache.hadoop.io.serializer.SerializationFactory; 042import org.apache.hadoop.io.serializer.Serializer; 043 044/** 045 * General reflection utils 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Evolving 049public class ReflectionUtils { 050 051 private static final Class<?>[] EMPTY_ARRAY = new Class[]{}; 052 volatile private static SerializationFactory serialFactory = null; 053 054 /** 055 * Cache of constructors for each class. Pins the classes so they 056 * can't be garbage collected until ReflectionUtils can be collected. 057 */ 058 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 059 new ConcurrentHashMap<Class<?>, Constructor<?>>(); 060 061 /** 062 * Check and set 'configuration' if necessary. 063 * 064 * @param theObject object for which to set configuration 065 * @param conf Configuration 066 */ 067 public static void setConf(Object theObject, Configuration conf) { 068 if (conf != null) { 069 if (theObject instanceof Configurable) { 070 ((Configurable) theObject).setConf(conf); 071 } 072 setJobConf(theObject, conf); 073 } 074 } 075 076 /** 077 * This code is to support backward compatibility and break the compile 078 * time dependency of core on mapred. 079 * This should be made deprecated along with the mapred package HADOOP-1230. 080 * Should be removed when mapred package is removed. 081 */ 082 private static void setJobConf(Object theObject, Configuration conf) { 083 //If JobConf and JobConfigurable are in classpath, AND 084 //theObject is of type JobConfigurable AND 085 //conf is of type JobConf then 086 //invoke configure on theObject 087 try { 088 Class<?> jobConfClass = 089 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf"); 090 if (jobConfClass == null) { 091 return; 092 } 093 094 Class<?> jobConfigurableClass = 095 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable"); 096 if (jobConfigurableClass == null) { 097 return; 098 } 099 if (jobConfClass.isAssignableFrom(conf.getClass()) && 100 jobConfigurableClass.isAssignableFrom(theObject.getClass())) { 101 Method configureMethod = 102 jobConfigurableClass.getMethod("configure", jobConfClass); 103 configureMethod.invoke(theObject, conf); 104 } 105 } catch (Exception e) { 106 throw new RuntimeException("Error in configuring object", e); 107 } 108 } 109 110 /** Create an object for the given class and initialize it from conf 111 * 112 * @param theClass class of which an object is created 113 * @param conf Configuration 114 * @return a new object 115 */ 116 @SuppressWarnings("unchecked") 117 public static <T> T newInstance(Class<T> theClass, Configuration conf) { 118 T result; 119 try { 120 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); 121 if (meth == null) { 122 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); 123 meth.setAccessible(true); 124 CONSTRUCTOR_CACHE.put(theClass, meth); 125 } 126 result = meth.newInstance(); 127 } catch (Exception e) { 128 throw new RuntimeException(e); 129 } 130 setConf(result, conf); 131 return result; 132 } 133 134 static private ThreadMXBean threadBean = 135 ManagementFactory.getThreadMXBean(); 136 137 public static void setContentionTracing(boolean val) { 138 threadBean.setThreadContentionMonitoringEnabled(val); 139 } 140 141 private static String getTaskName(long id, String name) { 142 if (name == null) { 143 return Long.toString(id); 144 } 145 return id + " (" + name + ")"; 146 } 147 148 /** 149 * Print all of the thread's information and stack traces. 150 * 151 * @param stream the stream to 152 * @param title a string title for the stack trace 153 */ 154 public static void printThreadInfo(PrintWriter stream, 155 String title) { 156 final int STACK_DEPTH = 20; 157 boolean contention = threadBean.isThreadContentionMonitoringEnabled(); 158 long[] threadIds = threadBean.getAllThreadIds(); 159 stream.println("Process Thread Dump: " + title); 160 stream.println(threadIds.length + " active threads"); 161 for (long tid: threadIds) { 162 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH); 163 if (info == null) { 164 stream.println(" Inactive"); 165 continue; 166 } 167 stream.println("Thread " + 168 getTaskName(info.getThreadId(), 169 info.getThreadName()) + ":"); 170 Thread.State state = info.getThreadState(); 171 stream.println(" State: " + state); 172 stream.println(" Blocked count: " + info.getBlockedCount()); 173 stream.println(" Waited count: " + info.getWaitedCount()); 174 if (contention) { 175 stream.println(" Blocked time: " + info.getBlockedTime()); 176 stream.println(" Waited time: " + info.getWaitedTime()); 177 } 178 if (state == Thread.State.WAITING) { 179 stream.println(" Waiting on " + info.getLockName()); 180 } else if (state == Thread.State.BLOCKED) { 181 stream.println(" Blocked on " + info.getLockName()); 182 stream.println(" Blocked by " + 183 getTaskName(info.getLockOwnerId(), 184 info.getLockOwnerName())); 185 } 186 stream.println(" Stack:"); 187 for (StackTraceElement frame: info.getStackTrace()) { 188 stream.println(" " + frame.toString()); 189 } 190 } 191 stream.flush(); 192 } 193 194 private static long previousLogTime = 0; 195 196 /** 197 * Log the current thread stacks at INFO level. 198 * @param log the logger that logs the stack trace 199 * @param title a descriptive title for the call stacks 200 * @param minInterval the minimum time from the last 201 */ 202 public static void logThreadInfo(Log log, 203 String title, 204 long minInterval) { 205 boolean dumpStack = false; 206 if (log.isInfoEnabled()) { 207 synchronized (ReflectionUtils.class) { 208 long now = System.currentTimeMillis(); 209 if (now - previousLogTime >= minInterval * 1000) { 210 previousLogTime = now; 211 dumpStack = true; 212 } 213 } 214 if (dumpStack) { 215 ByteArrayOutputStream buffer = new ByteArrayOutputStream(); 216 printThreadInfo(new PrintWriter(buffer), title); 217 log.info(buffer.toString()); 218 } 219 } 220 } 221 222 /** 223 * Return the correctly-typed {@link Class} of the given object. 224 * 225 * @param o object whose correctly-typed <code>Class</code> is to be obtained 226 * @return the correctly typed <code>Class</code> of the given object. 227 */ 228 @SuppressWarnings("unchecked") 229 public static <T> Class<T> getClass(T o) { 230 return (Class<T>)o.getClass(); 231 } 232 233 // methods to support testing 234 static void clearCache() { 235 CONSTRUCTOR_CACHE.clear(); 236 } 237 238 static int getCacheSize() { 239 return CONSTRUCTOR_CACHE.size(); 240 } 241 /** 242 * A pair of input/output buffers that we use to clone writables. 243 */ 244 private static class CopyInCopyOutBuffer { 245 DataOutputBuffer outBuffer = new DataOutputBuffer(); 246 DataInputBuffer inBuffer = new DataInputBuffer(); 247 /** 248 * Move the data from the output buffer to the input buffer. 249 */ 250 void moveData() { 251 inBuffer.reset(outBuffer.getData(), outBuffer.getLength()); 252 } 253 } 254 255 /** 256 * Allocate a buffer for each thread that tries to clone objects. 257 */ 258 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers 259 = new ThreadLocal<CopyInCopyOutBuffer>() { 260 protected synchronized CopyInCopyOutBuffer initialValue() { 261 return new CopyInCopyOutBuffer(); 262 } 263 }; 264 265 private static SerializationFactory getFactory(Configuration conf) { 266 if (serialFactory == null) { 267 serialFactory = new SerializationFactory(conf); 268 } 269 return serialFactory; 270 } 271 272 /** 273 * Make a copy of the writable object using serialization to a buffer 274 * @param dst the object to copy from 275 * @param src the object to copy into, which is destroyed 276 * @throws IOException 277 */ 278 @SuppressWarnings("unchecked") 279 public static <T> T copy(Configuration conf, 280 T src, T dst) throws IOException { 281 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 282 buffer.outBuffer.reset(); 283 SerializationFactory factory = getFactory(conf); 284 Class<T> cls = (Class<T>) src.getClass(); 285 Serializer<T> serializer = factory.getSerializer(cls); 286 serializer.open(buffer.outBuffer); 287 serializer.serialize(src); 288 buffer.moveData(); 289 Deserializer<T> deserializer = factory.getDeserializer(cls); 290 deserializer.open(buffer.inBuffer); 291 dst = deserializer.deserialize(dst); 292 return dst; 293 } 294 295 @Deprecated 296 public static void cloneWritableInto(Writable dst, 297 Writable src) throws IOException { 298 CopyInCopyOutBuffer buffer = cloneBuffers.get(); 299 buffer.outBuffer.reset(); 300 src.write(buffer.outBuffer); 301 buffer.moveData(); 302 dst.readFields(buffer.inBuffer); 303 } 304}