001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.util;
020    
021    import java.io.ByteArrayOutputStream;
022    import java.io.IOException;
023    import java.io.PrintWriter;
024    import java.lang.management.ManagementFactory;
025    import java.lang.management.ThreadInfo;
026    import java.lang.management.ThreadMXBean;
027    import java.lang.reflect.Constructor;
028    import java.lang.reflect.Field;
029    import java.lang.reflect.Method;
030    import java.util.ArrayList;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.concurrent.ConcurrentHashMap;
034    
035    import org.apache.commons.logging.Log;
036    import org.apache.hadoop.classification.InterfaceAudience;
037    import org.apache.hadoop.classification.InterfaceStability;
038    import org.apache.hadoop.conf.Configurable;
039    import org.apache.hadoop.conf.Configuration;
040    import org.apache.hadoop.io.DataInputBuffer;
041    import org.apache.hadoop.io.DataOutputBuffer;
042    import org.apache.hadoop.io.Writable;
043    import org.apache.hadoop.io.serializer.Deserializer;
044    import org.apache.hadoop.io.serializer.SerializationFactory;
045    import org.apache.hadoop.io.serializer.Serializer;
046    
047    /**
048     * General reflection utils
049     */
050    @InterfaceAudience.Public
051    @InterfaceStability.Evolving
052    public class ReflectionUtils {
053        
054      private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
055      volatile private static SerializationFactory serialFactory = null;
056    
057      /** 
058       * Cache of constructors for each class. Pins the classes so they
059       * can't be garbage collected until ReflectionUtils can be collected.
060       */
061      private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = 
062        new ConcurrentHashMap<Class<?>, Constructor<?>>();
063    
064      /**
065       * Check and set 'configuration' if necessary.
066       * 
067       * @param theObject object for which to set configuration
068       * @param conf Configuration
069       */
070      public static void setConf(Object theObject, Configuration conf) {
071        if (conf != null) {
072          if (theObject instanceof Configurable) {
073            ((Configurable) theObject).setConf(conf);
074          }
075          setJobConf(theObject, conf);
076        }
077      }
078      
079      /**
080       * This code is to support backward compatibility and break the compile  
081       * time dependency of core on mapred.
082       * This should be made deprecated along with the mapred package HADOOP-1230. 
083       * Should be removed when mapred package is removed.
084       */
085      private static void setJobConf(Object theObject, Configuration conf) {
086        //If JobConf and JobConfigurable are in classpath, AND
087        //theObject is of type JobConfigurable AND
088        //conf is of type JobConf then
089        //invoke configure on theObject
090        try {
091          Class<?> jobConfClass = 
092            conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConf");
093          if (jobConfClass == null) {
094            return;
095          }
096          
097          Class<?> jobConfigurableClass = 
098            conf.getClassByNameOrNull("org.apache.hadoop.mapred.JobConfigurable");
099          if (jobConfigurableClass == null) {
100            return;
101          }
102          if (jobConfClass.isAssignableFrom(conf.getClass()) &&
103                jobConfigurableClass.isAssignableFrom(theObject.getClass())) {
104            Method configureMethod = 
105              jobConfigurableClass.getMethod("configure", jobConfClass);
106            configureMethod.invoke(theObject, conf);
107          }
108        } catch (Exception e) {
109          throw new RuntimeException("Error in configuring object", e);
110        }
111      }
112    
113      /** Create an object for the given class and initialize it from conf
114       * 
115       * @param theClass class of which an object is created
116       * @param conf Configuration
117       * @return a new object
118       */
119      @SuppressWarnings("unchecked")
120      public static <T> T newInstance(Class<T> theClass, Configuration conf) {
121        T result;
122        try {
123          Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
124          if (meth == null) {
125            meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
126            meth.setAccessible(true);
127            CONSTRUCTOR_CACHE.put(theClass, meth);
128          }
129          result = meth.newInstance();
130        } catch (Exception e) {
131          throw new RuntimeException(e);
132        }
133        setConf(result, conf);
134        return result;
135      }
136    
137      static private ThreadMXBean threadBean = 
138        ManagementFactory.getThreadMXBean();
139        
140      public static void setContentionTracing(boolean val) {
141        threadBean.setThreadContentionMonitoringEnabled(val);
142      }
143        
144      private static String getTaskName(long id, String name) {
145        if (name == null) {
146          return Long.toString(id);
147        }
148        return id + " (" + name + ")";
149      }
150        
151      /**
152       * Print all of the thread's information and stack traces.
153       * 
154       * @param stream the stream to
155       * @param title a string title for the stack trace
156       */
157      public synchronized static void printThreadInfo(PrintWriter stream,
158                                         String title) {
159        final int STACK_DEPTH = 20;
160        boolean contention = threadBean.isThreadContentionMonitoringEnabled();
161        long[] threadIds = threadBean.getAllThreadIds();
162        stream.println("Process Thread Dump: " + title);
163        stream.println(threadIds.length + " active threads");
164        for (long tid: threadIds) {
165          ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
166          if (info == null) {
167            stream.println("  Inactive");
168            continue;
169          }
170          stream.println("Thread " + 
171                         getTaskName(info.getThreadId(),
172                                     info.getThreadName()) + ":");
173          Thread.State state = info.getThreadState();
174          stream.println("  State: " + state);
175          stream.println("  Blocked count: " + info.getBlockedCount());
176          stream.println("  Waited count: " + info.getWaitedCount());
177          if (contention) {
178            stream.println("  Blocked time: " + info.getBlockedTime());
179            stream.println("  Waited time: " + info.getWaitedTime());
180          }
181          if (state == Thread.State.WAITING) {
182            stream.println("  Waiting on " + info.getLockName());
183          } else  if (state == Thread.State.BLOCKED) {
184            stream.println("  Blocked on " + info.getLockName());
185            stream.println("  Blocked by " + 
186                           getTaskName(info.getLockOwnerId(),
187                                       info.getLockOwnerName()));
188          }
189          stream.println("  Stack:");
190          for (StackTraceElement frame: info.getStackTrace()) {
191            stream.println("    " + frame.toString());
192          }
193        }
194        stream.flush();
195      }
196        
197      private static long previousLogTime = 0;
198        
199      /**
200       * Log the current thread stacks at INFO level.
201       * @param log the logger that logs the stack trace
202       * @param title a descriptive title for the call stacks
203       * @param minInterval the minimum time from the last 
204       */
205      public static void logThreadInfo(Log log,
206                                       String title,
207                                       long minInterval) {
208        boolean dumpStack = false;
209        if (log.isInfoEnabled()) {
210          synchronized (ReflectionUtils.class) {
211            long now = Time.now();
212            if (now - previousLogTime >= minInterval * 1000) {
213              previousLogTime = now;
214              dumpStack = true;
215            }
216          }
217          if (dumpStack) {
218            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
219            printThreadInfo(new PrintWriter(buffer), title);
220            log.info(buffer.toString());
221          }
222        }
223      }
224    
225      /**
226       * Return the correctly-typed {@link Class} of the given object.
227       *  
228       * @param o object whose correctly-typed <code>Class</code> is to be obtained
229       * @return the correctly typed <code>Class</code> of the given object.
230       */
231      @SuppressWarnings("unchecked")
232      public static <T> Class<T> getClass(T o) {
233        return (Class<T>)o.getClass();
234      }
235      
236      // methods to support testing
237      static void clearCache() {
238        CONSTRUCTOR_CACHE.clear();
239      }
240        
241      static int getCacheSize() {
242        return CONSTRUCTOR_CACHE.size();
243      }
244      /**
245       * A pair of input/output buffers that we use to clone writables.
246       */
247      private static class CopyInCopyOutBuffer {
248        DataOutputBuffer outBuffer = new DataOutputBuffer();
249        DataInputBuffer inBuffer = new DataInputBuffer();
250        /**
251         * Move the data from the output buffer to the input buffer.
252         */
253        void moveData() {
254          inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
255        }
256      }
257      
258      /**
259       * Allocate a buffer for each thread that tries to clone objects.
260       */
261      private static ThreadLocal<CopyInCopyOutBuffer> cloneBuffers
262          = new ThreadLocal<CopyInCopyOutBuffer>() {
263          @Override
264          protected synchronized CopyInCopyOutBuffer initialValue() {
265            return new CopyInCopyOutBuffer();
266          }
267        };
268    
269      private static SerializationFactory getFactory(Configuration conf) {
270        if (serialFactory == null) {
271          serialFactory = new SerializationFactory(conf);
272        }
273        return serialFactory;
274      }
275      
276      /**
277       * Make a copy of the writable object using serialization to a buffer
278       * @param src the object to copy from
279       * @param dst the object to copy into, which is destroyed
280       * @return dst param (the copy)
281       * @throws IOException
282       */
283      @SuppressWarnings("unchecked")
284      public static <T> T copy(Configuration conf, 
285                                    T src, T dst) throws IOException {
286        CopyInCopyOutBuffer buffer = cloneBuffers.get();
287        buffer.outBuffer.reset();
288        SerializationFactory factory = getFactory(conf);
289        Class<T> cls = (Class<T>) src.getClass();
290        Serializer<T> serializer = factory.getSerializer(cls);
291        serializer.open(buffer.outBuffer);
292        serializer.serialize(src);
293        buffer.moveData();
294        Deserializer<T> deserializer = factory.getDeserializer(cls);
295        deserializer.open(buffer.inBuffer);
296        dst = deserializer.deserialize(dst);
297        return dst;
298      }
299    
300      @Deprecated
301      public static void cloneWritableInto(Writable dst, 
302                                           Writable src) throws IOException {
303        CopyInCopyOutBuffer buffer = cloneBuffers.get();
304        buffer.outBuffer.reset();
305        src.write(buffer.outBuffer);
306        buffer.moveData();
307        dst.readFields(buffer.inBuffer);
308      }
309      
310      /**
311       * Gets all the declared fields of a class including fields declared in
312       * superclasses.
313       */
314      public static List<Field> getDeclaredFieldsIncludingInherited(Class<?> clazz) {
315        List<Field> fields = new ArrayList<Field>();
316        while (clazz != null) {
317          for (Field field : clazz.getDeclaredFields()) {
318            fields.add(field);
319          }
320          clazz = clazz.getSuperclass();
321        }
322        
323        return fields;
324      }
325      
326      /**
327       * Gets all the declared methods of a class including methods declared in
328       * superclasses.
329       */
330      public static List<Method> getDeclaredMethodsIncludingInherited(Class<?> clazz) {
331        List<Method> methods = new ArrayList<Method>();
332        while (clazz != null) {
333          for (Method method : clazz.getDeclaredMethods()) {
334            methods.add(method);
335          }
336          clazz = clazz.getSuperclass();
337        }
338        
339        return methods;
340      }
341    }