001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.util;
020
021 import java.io.ByteArrayOutputStream;
022 import java.io.IOException;
023 import java.io.PrintWriter;
024 import java.lang.management.ManagementFactory;
025 import java.lang.management.ThreadInfo;
026 import java.lang.management.ThreadMXBean;
027 import java.lang.reflect.Constructor;
028 import java.lang.reflect.Field;
029 import java.lang.reflect.Method;
030 import java.util.ArrayList;
031 import java.util.List;
032 import java.util.Map;
033 import java.util.concurrent.ConcurrentHashMap;
034
035 import org.apache.commons.logging.Log;
036 import org.apache.hadoop.classification.InterfaceAudience;
037 import org.apache.hadoop.classification.InterfaceStability;
038 import org.apache.hadoop.conf.Configurable;
039 import org.apache.hadoop.conf.Configuration;
040 import org.apache.hadoop.io.DataInputBuffer;
041 import org.apache.hadoop.io.DataOutputBuffer;
042 import org.apache.hadoop.io.Writable;
043 import org.apache.hadoop.io.serializer.Deserializer;
044 import org.apache.hadoop.io.serializer.SerializationFactory;
045 import org.apache.hadoop.io.serializer.Serializer;
046
047 /**
048 * General reflection utils
049 */
050 @InterfaceAudience.Public
051 @InterfaceStability.Evolving
052 public class ReflectionUtils {
053
054 private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
055 volatile private static SerializationFactory serialFactory = null;
056
057 /**
058 * Cache of constructors for each class. Pins the classes so they
059 * can't be garbage collected until ReflectionUtils can be collected.
060 */
061 private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
062 new ConcurrentHashMap<Class<?>, Constructor<?>>();
063
064 /**
065 * Check and set 'configuration' if necessary.
066 *
067 * @param theObject object for which to set configuration
068 * @param conf Configuration
069 */
070 public static void setConf(Object theObject, Configuration conf) {
071 if (conf != null) {
072 if (theObject instanceof Configurable) {
073 ((Configurable) theObject).setConf(conf);
074 }
075 setJobConf(theObject, conf);
076 }
077 }
078
079 /**
080 * This code is to support backward compatibility and break the compile
081 * time dependency of core on mapred.
082 * This should be made deprecated along with the mapred package HADOOP-1230.
083 * Should be removed when mapred package is removed.
084 */
085 private static void setJobConf(Object theObject, Configuration conf) {
086 //If JobConf and JobConfigurable are in classpath, AND
087 //theObject is of type JobConfigurable AND
088 //conf is of type JobConf then
089 //invoke configure on theObject
090 try {
091 Class<?> jobConfClass =
092 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
093 if (jobConfClass == null) {
094 return;
095 }
096
097 Class<?> jobConfigurableClass =
098 conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
099 if (jobConfigurableClass == null) {
100 return;
101 }
102 if (jobConfClass.isAssignableFrom(conf.getClass()) &&
103 jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
104 Method configureMethod =
105 jobConfigurableClass.getMethod("configure", jobConfClass);
106 configureMethod.invoke(theObject, conf);
107 }
108 } catch (Exception e) {
109 throw new RuntimeException("Error in configuring object", e);
110 }
111 }
112
113 /** Create an object for the given class and initialize it from conf
114 *
115 * @param theClass class of which an object is created
116 * @param conf Configuration
117 * @return a new object
118 */
119 @SuppressWarnings("unchecked")
120 public static <T> T newInstance(Class<T> theClass, Configuration conf) {
121 T result;
122 try {
123 Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
124 if (meth == null) {
125 meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
126 meth.setAccessible(true);
127 CONSTRUCTOR_CACHE.put(theClass, meth);
128 }
129 result = meth.newInstance();
130 } catch (Exception e) {
131 throw new RuntimeException(e);
132 }
133 setConf(result, conf);
134 return result;
135 }
136
137 static private ThreadMXBean threadBean =
138 ManagementFactory.getThreadMXBean();
139
140 public static void setContentionTracing(boolean val) {
141 threadBean.setThreadContentionMonitoringEnabled(val);
142 }
143
144 private static String getTaskName(long id, String name) {
145 if (name == null) {
146 return Long.toString(id);
147 }
148 return id + " (" + name + ")";
149 }
150
151 /**
152 * Print all of the thread's information and stack traces.
153 *
154 * @param stream the stream to
155 * @param title a string title for the stack trace
156 */
157 public synchronized static void printThreadInfo(PrintWriter stream,
158 String title) {
159 final int STACK_DEPTH = 20;
160 boolean contention = threadBean.isThreadContentionMonitoringEnabled();
161 long[] threadIds = threadBean.getAllThreadIds();
162 stream.println("Process Thread Dump: " + title);
163 stream.println(threadIds.length + " active threads");
164 for (long tid: threadIds) {
165 ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
166 if (info == null) {
167 stream.println(" Inactive");
168 continue;
169 }
170 stream.println("Thread " +
171 getTaskName(info.getThreadId(),
172 info.getThreadName()) + ":");
173 Thread.State state = info.getThreadState();
174 stream.println(" State: " + state);
175 stream.println(" Blocked count: " + info.getBlockedCount());
176 stream.println(" Waited count: " + info.getWaitedCount());
177 if (contention) {
178 stream.println(" Blocked time: " + info.getBlockedTime());
179 stream.println(" Waited time: " + info.getWaitedTime());
180 }
181 if (state == Thread.State.WAITING) {
182 stream.println(" Waiting on " + info.getLockName());
183 } else if (state == Thread.State.BLOCKED) {
184 stream.println(" Blocked on " + info.getLockName());
185 stream.println(" Blocked by " +
186 getTaskName(info.getLockOwnerId(),
187 info.getLockOwnerName()));
188 }
189 stream.println(" Stack:");
190 for (StackTraceElement frame: info.getStackTrace()) {
191 stream.println(" " + frame.toString());
192 }
193 }
194 stream.flush();
195 }
196
197 private static long previousLogTime = 0;
198
199 /**
200 * Log the current thread stacks at INFO level.
201 * @param log the logger that logs the stack trace
202 * @param title a descriptive title for the call stacks
203 * @param minInterval the minimum time from the last
204 */
205 public static void logThreadInfo(Log log,
206 String title,
207 long minInterval) {
208 boolean dumpStack = false;
209 if (log.isInfoEnabled()) {
210 synchronized (ReflectionUtils.class) {
211 long now = Time.now();
212 if (now - previousLogTime >= minInterval * 1000) {
213 previousLogTime = now;
214 dumpStack = true;
215 }
216 }
217 if (dumpStack) {
218 ByteArrayOutputStream buffer = new ByteArrayOutputStream();
219 printThreadInfo(new PrintWriter(buffer), title);
220 log.info(buffer.toString());
221 }
222 }
223 }
224
225 /**
226 * Return the correctly-typed {@link Class} of the given object.
227 *
228 * @param o object whose correctly-typed <code>Class</code> is to be obtained
229 * @return the correctly typed <code>Class</code> of the given object.
230 */
231 @SuppressWarnings("unchecked")
232 public static <T> Class<T> getClass(T o) {
233 return (Class<T>)o.getClass();
234 }
235
236 // methods to support testing
237 static void clearCache() {
238 CONSTRUCTOR_CACHE.clear();
239 }
240
241 static int getCacheSize() {
242 return CONSTRUCTOR_CACHE.size();
243 }
244 /**
245 * A pair of input/output buffers that we use to clone writables.
246 */
247 private static class CopyInCopyOutBuffer {
248 DataOutputBuffer outBuffer = new DataOutputBuffer();
249 DataInputBuffer inBuffer = new DataInputBuffer();
250 /**
251 * Move the data from the output buffer to the input buffer.
252 */
253 void moveData() {
254 inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
255 }
256 }
257
258 /**
259 * Allocate a buffer for each thread that tries to clone objects.
260 */
261 private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
262 = new ThreadLocal<CopyInCopyOutBuffer>() {
263 @Override
264 protected synchronized CopyInCopyOutBuffer initialValue() {
265 return new CopyInCopyOutBuffer();
266 }
267 };
268
269 private static SerializationFactory getFactory(Configuration conf) {
270 if (serialFactory == null) {
271 serialFactory = new SerializationFactory(conf);
272 }
273 return serialFactory;
274 }
275
276 /**
277 * Make a copy of the writable object using serialization to a buffer
278 * @param src the object to copy from
279 * @param dst the object to copy into, which is destroyed
280 * @return dst param (the copy)
281 * @throws IOException
282 */
283 @SuppressWarnings("unchecked")
284 public static <T> T copy(Configuration conf,
285 T src, T dst) throws IOException {
286 CopyInCopyOutBuffer buffer = cloneBuffers.get();
287 buffer.outBuffer.reset();
288 SerializationFactory factory = getFactory(conf);
289 Class<T> cls = (Class<T>) src.getClass();
290 Serializer<T> serializer = factory.getSerializer(cls);
291 serializer.open(buffer.outBuffer);
292 serializer.serialize(src);
293 buffer.moveData();
294 Deserializer<T> deserializer = factory.getDeserializer(cls);
295 deserializer.open(buffer.inBuffer);
296 dst = deserializer.deserialize(dst);
297 return dst;
298 }
299
300 @Deprecated
301 public static void cloneWritableInto(Writable dst,
302 Writable src) throws IOException {
303 CopyInCopyOutBuffer buffer = cloneBuffers.get();
304 buffer.outBuffer.reset();
305 src.write(buffer.outBuffer);
306 buffer.moveData();
307 dst.readFields(buffer.inBuffer);
308 }
309
310 /**
311 * Gets all the declared fields of a class including fields declared in
312 * superclasses.
313 */
314 public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
315 List<Field> fields = new ArrayList<Field>();
316 while (clazz != null) {
317 for (Field field : clazz.getDeclaredFields()) {
318 fields.add(field);
319 }
320 clazz = clazz.getSuperclass();
321 }
322
323 return fields;
324 }
325
326 /**
327 * Gets all the declared methods of a class including methods declared in
328 * superclasses.
329 */
330 public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) {
331 List<Method> methods = new ArrayList<Method>();
332 while (clazz != null) {
333 for (Method method : clazz.getDeclaredMethods()) {
334 methods.add(method);
335 }
336 clazz = clazz.getSuperclass();
337 }
338
339 return methods;
340 }
341 }