001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.util;
020
021import java.io.ByteArrayOutputStream;
022import java.io.IOException;
023import java.io.PrintWriter;
024import java.lang.management.ManagementFactory;
025import java.lang.management.ThreadInfo;
026import java.lang.management.ThreadMXBean;
027import java.lang.reflect.Constructor;
028import java.lang.reflect.Method;
029import java.util.Map;
030import java.util.concurrent.ConcurrentHashMap;
031
032import org.apache.commons.logging.Log;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configurable;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.io.DataInputBuffer;
038import org.apache.hadoop.io.DataOutputBuffer;
039import org.apache.hadoop.io.Writable;
040import org.apache.hadoop.io.serializer.Deserializer;
041import org.apache.hadoop.io.serializer.SerializationFactory;
042import org.apache.hadoop.io.serializer.Serializer;
043
044/**
045 * General reflection utils
046 */
047@InterfaceAudience.Public
048@InterfaceStability.Evolving
049public class ReflectionUtils {
050    
051  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
052  volatile private static SerializationFactory serialFactory = null;
053
054  /** 
055   * Cache of constructors for each class. Pins the classes so they
056   * can't be garbage collected until ReflectionUtils can be collected.
057   */
058  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
059    new ConcurrentHashMap<Class<?>, Constructor<?>>();
060
061  /**
062   * Check and set 'configuration' if necessary.
063   * 
064   * @param theObject object for which to set configuration
065   * @param conf Configuration
066   */
067  public static void setConf(Object theObject, Configuration conf) {
068    if (conf != null) {
069      if (theObject instanceof Configurable) {
070        ((Configurable) theObject).setConf(conf);
071      }
072      setJobConf(theObject, conf);
073    }
074  }
075  
076  /**
077   * This code is to support backward compatibility and break the compile  
078   * time dependency of core on mapred.
079   * This should be made deprecated along with the mapred package HADOOP-1230. 
080   * Should be removed when mapred package is removed.
081   */
082  private static void setJobConf(Object theObject, Configuration conf) {
083    //If JobConf and JobConfigurable are in classpath, AND
084    //theObject is of type JobConfigurable AND
085    //conf is of type JobConf then
086    //invoke configure on theObject
087    try {
088      Class<?> jobConfClass = 
089        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
090      if (jobConfClass == null) {
091        return;
092      }
093      
094      Class<?> jobConfigurableClass = 
095        conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
096      if (jobConfigurableClass == null) {
097        return;
098      }
099      if (jobConfClass.isAssignableFrom(conf.getClass()) &&
100            jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
101        Method configureMethod = 
102          jobConfigurableClass.getMethod("configure", jobConfClass);
103        configureMethod.invoke(theObject, conf);
104      }
105    } catch (Exception e) {
106      throw new RuntimeException("Error in configuring object", e);
107    }
108  }
109
110  /** Create an object for the given class and initialize it from conf
111   * 
112   * @param theClass class of which an object is created
113   * @param conf Configuration
114   * @return a new object
115   */
116  @SuppressWarnings("unchecked")
117  public static <T> T newInstance(Class<T> theClass, Configuration conf) {
118    T result;
119    try {
120      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
121      if (meth == null) {
122        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
123        meth.setAccessible(true);
124        CONSTRUCTOR_CACHE.put(theClass, meth);
125      }
126      result = meth.newInstance();
127    } catch (Exception e) {
128      throw new RuntimeException(e);
129    }
130    setConf(result, conf);
131    return result;
132  }
133
134  static private ThreadMXBean threadBean = 
135    ManagementFactory.getThreadMXBean();
136    
137  public static void setContentionTracing(boolean val) {
138    threadBean.setThreadContentionMonitoringEnabled(val);
139  }
140    
141  private static String getTaskName(long id, String name) {
142    if (name == null) {
143      return Long.toString(id);
144    }
145    return id + " (" + name + ")";
146  }
147    
148  /**
149   * Print all of the thread's information and stack traces.
150   * 
151   * @param stream the stream to
152   * @param title a string title for the stack trace
153   */
154  public static void printThreadInfo(PrintWriter stream,
155                                     String title) {
156    final int STACK_DEPTH = 20;
157    boolean contention = threadBean.isThreadContentionMonitoringEnabled();
158    long[] threadIds = threadBean.getAllThreadIds();
159    stream.println("Process Thread Dump: " + title);
160    stream.println(threadIds.length + " active threads");
161    for (long tid: threadIds) {
162      ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
163      if (info == null) {
164        stream.println("  Inactive");
165        continue;
166      }
167      stream.println("Thread " + 
168                     getTaskName(info.getThreadId(),
169                                 info.getThreadName()) + ":");
170      Thread.State state = info.getThreadState();
171      stream.println("  State: " + state);
172      stream.println("  Blocked count: " + info.getBlockedCount());
173      stream.println("  Waited count: " + info.getWaitedCount());
174      if (contention) {
175        stream.println("  Blocked time: " + info.getBlockedTime());
176        stream.println("  Waited time: " + info.getWaitedTime());
177      }
178      if (state == Thread.State.WAITING) {
179        stream.println("  Waiting on " + info.getLockName());
180      } else  if (state == Thread.State.BLOCKED) {
181        stream.println("  Blocked on " + info.getLockName());
182        stream.println("  Blocked by " + 
183                       getTaskName(info.getLockOwnerId(),
184                                   info.getLockOwnerName()));
185      }
186      stream.println("  Stack:");
187      for (StackTraceElement frame: info.getStackTrace()) {
188        stream.println("    " + frame.toString());
189      }
190    }
191    stream.flush();
192  }
193    
194  private static long previousLogTime = 0;
195    
196  /**
197   * Log the current thread stacks at INFO level.
198   * @param log the logger that logs the stack trace
199   * @param title a descriptive title for the call stacks
200   * @param minInterval the minimum time from the last 
201   */
202  public static void logThreadInfo(Log log,
203                                   String title,
204                                   long minInterval) {
205    boolean dumpStack = false;
206    if (log.isInfoEnabled()) {
207      synchronized (ReflectionUtils.class) {
208        long now = System.currentTimeMillis();
209        if (now - previousLogTime >= minInterval * 1000) {
210          previousLogTime = now;
211          dumpStack = true;
212        }
213      }
214      if (dumpStack) {
215        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
216        printThreadInfo(new PrintWriter(buffer), title);
217        log.info(buffer.toString());
218      }
219    }
220  }
221
222  /**
223   * Return the correctly-typed {@link Class} of the given object.
224   *  
225   * @param o object whose correctly-typed <code>Class</code> is to be obtained
226   * @return the correctly typed <code>Class</code> of the given object.
227   */
228  @SuppressWarnings("unchecked")
229  public static <T> Class<T> getClass(T o) {
230    return (Class<T>)o.getClass();
231  }
232  
233  // methods to support testing
234  static void clearCache() {
235    CONSTRUCTOR_CACHE.clear();
236  }
237    
238  static int getCacheSize() {
239    return CONSTRUCTOR_CACHE.size();
240  }
241  /**
242   * A pair of input/output buffers that we use to clone writables.
243   */
244  private static class CopyInCopyOutBuffer {
245    DataOutputBuffer outBuffer = new DataOutputBuffer();
246    DataInputBuffer inBuffer = new DataInputBuffer();
247    /**
248     * Move the data from the output buffer to the input buffer.
249     */
250    void moveData() {
251      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
252    }
253  }
254  
255  /**
256   * Allocate a buffer for each thread that tries to clone objects.
257   */
258  private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
259      = new ThreadLocal<CopyInCopyOutBuffer>() {
260      protected synchronized CopyInCopyOutBuffer initialValue() {
261        return new CopyInCopyOutBuffer();
262      }
263    };
264
265  private static SerializationFactory getFactory(Configuration conf) {
266    if (serialFactory == null) {
267      serialFactory = new SerializationFactory(conf);
268    }
269    return serialFactory;
270  }
271  
272  /**
273   * Make a copy of the writable object using serialization to a buffer
274   * @param dst the object to copy from
275   * @param src the object to copy into, which is destroyed
276   * @throws IOException
277   */
278  @SuppressWarnings("unchecked")
279  public static <T> T copy(Configuration conf, 
280                                T src, T dst) throws IOException {
281    CopyInCopyOutBuffer buffer = cloneBuffers.get();
282    buffer.outBuffer.reset();
283    SerializationFactory factory = getFactory(conf);
284    Class<T> cls = (Class<T>) src.getClass();
285    Serializer<T> serializer = factory.getSerializer(cls);
286    serializer.open(buffer.outBuffer);
287    serializer.serialize(src);
288    buffer.moveData();
289    Deserializer<T> deserializer = factory.getDeserializer(cls);
290    deserializer.open(buffer.inBuffer);
291    dst = deserializer.deserialize(dst);
292    return dst;
293  }
294
295  @Deprecated
296  public static void cloneWritableInto(Writable dst, 
297                                       Writable src) throws IOException {
298    CopyInCopyOutBuffer buffer = cloneBuffers.get();
299    buffer.outBuffer.reset();
300    src.write(buffer.outBuffer);
301    buffer.moveData();
302    dst.readFields(buffer.inBuffer);
303  }
304}