001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.util;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.lang.management.ManagementFactory;
025import java.lang.management.ThreadInfo;
026import java.lang.management.ThreadMXBean;
027import java.lang.reflect.Constructor;
028import java.lang.reflect.Field;
029import java.lang.reflect.Method;
030import java.util.ArrayList;
031import java.util.List;
032import java.util.Map;
033import java.util.concurrent.ConcurrentHashMap;
034
035import org.apache.commons.logging.Log;
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configurable;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.io.DataInputBuffer;
041import org.apache.hadoop.io.DataOutputBuffer;
042import org.apache.hadoop.io.Writable;
043import org.apache.hadoop.io.serializer.Deserializer;
044import org.apache.hadoop.io.serializer.SerializationFactory;
045import org.apache.hadoop.io.serializer.Serializer;
046
047/**
048 * General reflection utils
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class ReflectionUtils {
053    
054  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
055  volatile private static SerializationFactory serialFactory = null;
056
057  /** 
058   * Cache of constructors for each class. Pins the classes so they
059   * can't be garbage collected until ReflectionUtils can be collected.
060   */
061  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
062    new ConcurrentHashMap<Class<?>, Constructor<?>>();
063
064  /**
065   * Check and set 'configuration' if necessary.
066   * 
067   * @param theObject object for which to set configuration
068   * @param conf Configuration
069   */
070  public static void setConf(Object theObject, Configuration conf) {
071    if (conf != null) {
072      if (theObject instanceof Configurable) {
073        ((Configurable) theObject).setConf(conf);
074      }
075      setJobConf(theObject, conf);
076    }
077  }
078  
079  /**
080   * This code is to support backward compatibility and break the compile  
081   * time dependency of core on mapred.
082   * This should be made deprecated along with the mapred package HADOOP-1230. 
083   * Should be removed when mapred package is removed.
084   */
085  private static void setJobConf(Object theObject, Configuration conf) {
086    //If JobConf and JobConfigurable are in classpath, AND
087    //theObject is of type JobConfigurable AND
088    //conf is of type JobConf then
089    //invoke configure on theObject
090    try {
091      Class<?> jobConfClass = 
092        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
093      if (jobConfClass == null) {
094        return;
095      }
096      
097      Class<?> jobConfigurableClass = 
098        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
099      if (jobConfigurableClass == null) {
100        return;
101      }
102      if (jobConfClass.isAssignableFrom(conf.getClass()) &&
103            jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
104        Method configureMethod = 
105          jobConfigurableClass.getMethod("configure", jobConfClass);
106        configureMethod.invoke(theObject, conf);
107      }
108    } catch (Exception e) {
109      throw new RuntimeException("Error in configuring object", e);
110    }
111  }
112
113  /** Create an object for the given class and initialize it from conf
114   * 
115   * @param theClass class of which an object is created
116   * @param conf Configuration
117   * @return a new object
118   */
119  @SuppressWarnings("unchecked")
120  public static <T> T newInstance(Class<T> theClass, Configuration conf) {
121    T result;
122    try {
123      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
124      if (meth == null) {
125        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
126        meth.setAccessible(true);
127        CONSTRUCTOR_CACHE.put(theClass, meth);
128      }
129      result = meth.newInstance();
130    } catch (Exception e) {
131      throw new RuntimeException(e);
132    }
133    setConf(result, conf);
134    return result;
135  }
136
137  static private ThreadMXBean threadBean = 
138    ManagementFactory.getThreadMXBean();
139    
140  public static void setContentionTracing(boolean val) {
141    threadBean.setThreadContentionMonitoringEnabled(val);
142  }
143    
144  private static String getTaskName(long id, String name) {
145    if (name == null) {
146      return Long.toString(id);
147    }
148    return id + " (" + name + ")";
149  }
150    
151  /**
152   * Print all of the thread's information and stack traces.
153   * 
154   * @param stream the stream to
155   * @param title a string title for the stack trace
156   */
157  public synchronized static void printThreadInfo(PrintWriter stream,
158                                     String title) {
159    final int STACK_DEPTH = 20;
160    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
161    long[] threadIds = threadBean.getAllThreadIds();
162    stream.println("Process Thread Dump: " + title);
163    stream.println(threadIds.length + " active threads");
164    for (long tid: threadIds) {
165      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
166      if (info == null) {
167        stream.println("  Inactive");
168        continue;
169      }
170      stream.println("Thread " + 
171                     getTaskName(info.getThreadId(),
172                                 info.getThreadName()) + ":");
173      Thread.State state = info.getThreadState();
174      stream.println("  State: " + state);
175      stream.println("  Blocked count: " + info.getBlockedCount());
176      stream.println("  Waited count: " + info.getWaitedCount());
177      if (contention) {
178        stream.println("  Blocked time: " + info.getBlockedTime());
179        stream.println("  Waited time: " + info.getWaitedTime());
180      }
181      if (state == Thread.State.WAITING) {
182        stream.println("  Waiting on " + info.getLockName());
183      } else  if (state == Thread.State.BLOCKED) {
184        stream.println("  Blocked on " + info.getLockName());
185        stream.println("  Blocked by " + 
186                       getTaskName(info.getLockOwnerId(),
187                                   info.getLockOwnerName()));
188      }
189      stream.println("  Stack:");
190      for (StackTraceElement frame: info.getStackTrace()) {
191        stream.println("    " + frame.toString());
192      }
193    }
194    stream.flush();
195  }
196    
197  private static long previousLogTime = 0;
198    
199  /**
200   * Log the current thread stacks at INFO level.
201   * @param log the logger that logs the stack trace
202   * @param title a descriptive title for the call stacks
203   * @param minInterval the minimum time from the last 
204   */
205  public static void logThreadInfo(Log log,
206                                   String title,
207                                   long minInterval) {
208    boolean dumpStack = false;
209    if (log.isInfoEnabled()) {
210      synchronized (ReflectionUtils.class) {
211        long now = Time.now();
212        if (now - previousLogTime >= minInterval * 1000) {
213          previousLogTime = now;
214          dumpStack = true;
215        }
216      }
217      if (dumpStack) {
218        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
219        printThreadInfo(new PrintWriter(buffer), title);
220        log.info(buffer.toString());
221      }
222    }
223  }
224
225  /**
226   * Return the correctly-typed {@link Class} of the given object.
227   *  
228   * @param o object whose correctly-typed <code>Class</code> is to be obtained
229   * @return the correctly typed <code>Class</code> of the given object.
230   */
231  @SuppressWarnings("unchecked")
232  public static <T> Class<T> getClass(T o) {
233    return (Class<T>)o.getClass();
234  }
235  
236  // methods to support testing
237  static void clearCache() {
238    CONSTRUCTOR_CACHE.clear();
239  }
240    
241  static int getCacheSize() {
242    return CONSTRUCTOR_CACHE.size();
243  }
244  /**
245   * A pair of input/output buffers that we use to clone writables.
246   */
247  private static class CopyInCopyOutBuffer {
248    DataOutputBuffer outBuffer = new DataOutputBuffer();
249    DataInputBuffer inBuffer = new DataInputBuffer();
250    /**
251     * Move the data from the output buffer to the input buffer.
252     */
253    void moveData() {
254      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
255    }
256  }
257  
258  /**
259   * Allocate a buffer for each thread that tries to clone objects.
260   */
261  private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
262      = new ThreadLocal<CopyInCopyOutBuffer>() {
263      @Override
264      protected synchronized CopyInCopyOutBuffer initialValue() {
265        return new CopyInCopyOutBuffer();
266      }
267    };
268
269  private static SerializationFactory getFactory(Configuration conf) {
270    if (serialFactory == null) {
271      serialFactory = new SerializationFactory(conf);
272    }
273    return serialFactory;
274  }
275  
276  /**
277   * Make a copy of the writable object using serialization to a buffer
278   * @param src the object to copy from
279   * @param dst the object to copy into, which is destroyed
280   * @return dst param (the copy)
281   * @throws IOException
282   */
283  @SuppressWarnings("unchecked")
284  public static <T> T copy(Configuration conf, 
285                                T src, T dst) throws IOException {
286    CopyInCopyOutBuffer buffer = cloneBuffers.get();
287    buffer.outBuffer.reset();
288    SerializationFactory factory = getFactory(conf);
289    Class<T> cls = (Class<T>) src.getClass();
290    Serializer<T> serializer = factory.getSerializer(cls);
291    serializer.open(buffer.outBuffer);
292    serializer.serialize(src);
293    buffer.moveData();
294    Deserializer<T> deserializer = factory.getDeserializer(cls);
295    deserializer.open(buffer.inBuffer);
296    dst = deserializer.deserialize(dst);
297    return dst;
298  }
299
300  @Deprecated
301  public static void cloneWritableInto(Writable dst, 
302                                       Writable src) throws IOException {
303    CopyInCopyOutBuffer buffer = cloneBuffers.get();
304    buffer.outBuffer.reset();
305    src.write(buffer.outBuffer);
306    buffer.moveData();
307    dst.readFields(buffer.inBuffer);
308  }
309  
310  /**
311   * Gets all the declared fields of a class including fields declared in
312   * superclasses.
313   */
314  public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
315    List<Field> fields = new ArrayList<Field>();
316    while (clazz != null) {
317      for (Field field : clazz.getDeclaredFields()) {
318        fields.add(field);
319      }
320      clazz = clazz.getSuperclass();
321    }
322    
323    return fields;
324  }
325  
326  /**
327   * Gets all the declared methods of a class including methods declared in
328   * superclasses.
329   */
330  public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) {
331    List<Method> methods = new ArrayList<Method>();
332    while (clazz != null) {
333      for (Method method : clazz.getDeclaredMethods()) {
334        methods.add(method);
335      }
336      clazz = clazz.getSuperclass();
337    }
338    
339    return methods;
340  }
341}