001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.util;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.PrintStream;
024import java.io.PrintWriter;
025import java.io.UnsupportedEncodingException;
026import java.lang.management.ManagementFactory;
027import java.lang.management.ThreadInfo;
028import java.lang.management.ThreadMXBean;
029import java.lang.reflect.Constructor;
030import java.lang.reflect.Field;
031import java.lang.reflect.Method;
032import java.nio.charset.Charset;
033import java.util.ArrayList;
034import java.util.List;
035import java.util.Map;
036import java.util.concurrent.ConcurrentHashMap;
037
038import org.apache.commons.logging.Log;
039import org.apache.hadoop.classification.InterfaceAudience;
040import org.apache.hadoop.classification.InterfaceStability;
041import org.apache.hadoop.conf.Configurable;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.io.DataInputBuffer;
044import org.apache.hadoop.io.DataOutputBuffer;
045import org.apache.hadoop.io.Writable;
046import org.apache.hadoop.io.serializer.Deserializer;
047import org.apache.hadoop.io.serializer.SerializationFactory;
048import org.apache.hadoop.io.serializer.Serializer;
049
050/**
051 * General reflection utils
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Evolving
055public class ReflectionUtils {
056    
057  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
058  volatile private static SerializationFactory serialFactory = null;
059
060  /** 
061   * Cache of constructors for each class. Pins the classes so they
062   * can't be garbage collected until ReflectionUtils can be collected.
063   */
064  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
065    new ConcurrentHashMap<Class<?>, Constructor<?>>();
066
067  /**
068   * Check and set 'configuration' if necessary.
069   * 
070   * @param theObject object for which to set configuration
071   * @param conf Configuration
072   */
073  public static void setConf(Object theObject, Configuration conf) {
074    if (conf != null) {
075      if (theObject instanceof Configurable) {
076        ((Configurable) theObject).setConf(conf);
077      }
078      setJobConf(theObject, conf);
079    }
080  }
081  
082  /**
083   * This code is to support backward compatibility and break the compile  
084   * time dependency of core on mapred.
085   * This should be made deprecated along with the mapred package HADOOP-1230. 
086   * Should be removed when mapred package is removed.
087   */
088  private static void setJobConf(Object theObject, Configuration conf) {
089    //If JobConf and JobConfigurable are in classpath, AND
090    //theObject is of type JobConfigurable AND
091    //conf is of type JobConf then
092    //invoke configure on theObject
093    try {
094      Class<?> jobConfClass = 
095        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
096      if (jobConfClass == null) {
097        return;
098      }
099      
100      Class<?> jobConfigurableClass = 
101        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
102      if (jobConfigurableClass == null) {
103        return;
104      }
105      if (jobConfClass.isAssignableFrom(conf.getClass()) &&
106            jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
107        Method configureMethod = 
108          jobConfigurableClass.getMethod("configure", jobConfClass);
109        configureMethod.invoke(theObject, conf);
110      }
111    } catch (Exception e) {
112      throw new RuntimeException("Error in configuring object", e);
113    }
114  }
115
116  /** Create an object for the given class and initialize it from conf
117   * 
118   * @param theClass class of which an object is created
119   * @param conf Configuration
120   * @return a new object
121   */
122  @SuppressWarnings("unchecked")
123  public static <T> T newInstance(Class<T> theClass, Configuration conf) {
124    T result;
125    try {
126      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
127      if (meth == null) {
128        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
129        meth.setAccessible(true);
130        CONSTRUCTOR_CACHE.put(theClass, meth);
131      }
132      result = meth.newInstance();
133    } catch (Exception e) {
134      throw new RuntimeException(e);
135    }
136    setConf(result, conf);
137    return result;
138  }
139
140  static private ThreadMXBean threadBean = 
141    ManagementFactory.getThreadMXBean();
142    
143  public static void setContentionTracing(boolean val) {
144    threadBean.setThreadContentionMonitoringEnabled(val);
145  }
146    
147  private static String getTaskName(long id, String name) {
148    if (name == null) {
149      return Long.toString(id);
150    }
151    return id + " (" + name + ")";
152  }
153    
154  /**
155   * Print all of the thread's information and stack traces.
156   * 
157   * @param stream the stream to
158   * @param title a string title for the stack trace
159   */
160  public synchronized static void printThreadInfo(PrintStream stream,
161                                     String title) {
162    final int STACK_DEPTH = 20;
163    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
164    long[] threadIds = threadBean.getAllThreadIds();
165    stream.println("Process Thread Dump: " + title);
166    stream.println(threadIds.length + " active threads");
167    for (long tid: threadIds) {
168      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
169      if (info == null) {
170        stream.println("  Inactive");
171        continue;
172      }
173      stream.println("Thread " + 
174                     getTaskName(info.getThreadId(),
175                                 info.getThreadName()) + ":");
176      Thread.State state = info.getThreadState();
177      stream.println("  State: " + state);
178      stream.println("  Blocked count: " + info.getBlockedCount());
179      stream.println("  Waited count: " + info.getWaitedCount());
180      if (contention) {
181        stream.println("  Blocked time: " + info.getBlockedTime());
182        stream.println("  Waited time: " + info.getWaitedTime());
183      }
184      if (state == Thread.State.WAITING) {
185        stream.println("  Waiting on " + info.getLockName());
186      } else  if (state == Thread.State.BLOCKED) {
187        stream.println("  Blocked on " + info.getLockName());
188        stream.println("  Blocked by " + 
189                       getTaskName(info.getLockOwnerId(),
190                                   info.getLockOwnerName()));
191      }
192      stream.println("  Stack:");
193      for (StackTraceElement frame: info.getStackTrace()) {
194        stream.println("    " + frame.toString());
195      }
196    }
197    stream.flush();
198  }
199    
200  private static long previousLogTime = 0;
201    
202  /**
203   * Log the current thread stacks at INFO level.
204   * @param log the logger that logs the stack trace
205   * @param title a descriptive title for the call stacks
206   * @param minInterval the minimum time from the last 
207   */
208  public static void logThreadInfo(Log log,
209                                   String title,
210                                   long minInterval) {
211    boolean dumpStack = false;
212    if (log.isInfoEnabled()) {
213      synchronized (ReflectionUtils.class) {
214        long now = Time.now();
215        if (now - previousLogTime >= minInterval * 1000) {
216          previousLogTime = now;
217          dumpStack = true;
218        }
219      }
220      if (dumpStack) {
221        try {
222          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
223          printThreadInfo(new PrintStream(buffer, false, "UTF-8"), title);
224          log.info(buffer.toString(Charset.defaultCharset().name()));
225        } catch (UnsupportedEncodingException ignored) {
226        }
227      }
228    }
229  }
230
231  /**
232   * Return the correctly-typed {@link Class} of the given object.
233   *  
234   * @param o object whose correctly-typed <code>Class</code> is to be obtained
235   * @return the correctly typed <code>Class</code> of the given object.
236   */
237  @SuppressWarnings("unchecked")
238  public static <T> Class<T> getClass(T o) {
239    return (Class<T>)o.getClass();
240  }
241  
242  // methods to support testing
243  static void clearCache() {
244    CONSTRUCTOR_CACHE.clear();
245  }
246    
247  static int getCacheSize() {
248    return CONSTRUCTOR_CACHE.size();
249  }
250  /**
251   * A pair of input/output buffers that we use to clone writables.
252   */
253  private static class CopyInCopyOutBuffer {
254    DataOutputBuffer outBuffer = new DataOutputBuffer();
255    DataInputBuffer inBuffer = new DataInputBuffer();
256    /**
257     * Move the data from the output buffer to the input buffer.
258     */
259    void moveData() {
260      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
261    }
262  }
263  
264  /**
265   * Allocate a buffer for each thread that tries to clone objects.
266   */
267  private static final ThreadLocal<CopyInCopyOutBuffer> CLONE_BUFFERS
268      = new ThreadLocal<CopyInCopyOutBuffer>() {
269      @Override
270      protected synchronized CopyInCopyOutBuffer initialValue() {
271        return new CopyInCopyOutBuffer();
272      }
273    };
274
275  private static SerializationFactory getFactory(Configuration conf) {
276    if (serialFactory == null) {
277      serialFactory = new SerializationFactory(conf);
278    }
279    return serialFactory;
280  }
281  
282  /**
283   * Make a copy of the writable object using serialization to a buffer
284   * @param src the object to copy from
285   * @param dst the object to copy into, which is destroyed
286   * @return dst param (the copy)
287   * @throws IOException
288   */
289  @SuppressWarnings("unchecked")
290  public static <T> T copy(Configuration conf, 
291                                T src, T dst) throws IOException {
292    CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
293    buffer.outBuffer.reset();
294    SerializationFactory factory = getFactory(conf);
295    Class<T> cls = (Class<T>) src.getClass();
296    Serializer<T> serializer = factory.getSerializer(cls);
297    serializer.open(buffer.outBuffer);
298    serializer.serialize(src);
299    buffer.moveData();
300    Deserializer<T> deserializer = factory.getDeserializer(cls);
301    deserializer.open(buffer.inBuffer);
302    dst = deserializer.deserialize(dst);
303    return dst;
304  }
305
306  @Deprecated
307  public static void cloneWritableInto(Writable dst, 
308                                       Writable src) throws IOException {
309    CopyInCopyOutBuffer buffer = CLONE_BUFFERS.get();
310    buffer.outBuffer.reset();
311    src.write(buffer.outBuffer);
312    buffer.moveData();
313    dst.readFields(buffer.inBuffer);
314  }
315  
316  /**
317   * Gets all the declared fields of a class including fields declared in
318   * superclasses.
319   */
320  public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
321    List<Field> fields = new ArrayList<Field>();
322    while (clazz != null) {
323      for (Field field : clazz.getDeclaredFields()) {
324        fields.add(field);
325      }
326      clazz = clazz.getSuperclass();
327    }
328    
329    return fields;
330  }
331  
332  /**
333   * Gets all the declared methods of a class including methods declared in
334   * superclasses.
335   */
336  public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) {
337    List<Method> methods = new ArrayList<Method>();
338    while (clazz != null) {
339      for (Method method : clazz.getDeclaredMethods()) {
340        methods.add(method);
341      }
342      clazz = clazz.getSuperclass();
343    }
344    
345    return methods;
346  }
347}