001    /*
002     * AbstractMetricsContext.java
003     *
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package org.apache.hadoop.metrics.spi;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.Collection;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.Timer;
033    import java.util.TimerTask;
034    import java.util.TreeMap;
035    import java.util.Map.Entry;
036    
037    import org.apache.hadoop.classification.InterfaceAudience;
038    import org.apache.hadoop.classification.InterfaceStability;
039    import org.apache.hadoop.metrics.ContextFactory;
040    import org.apache.hadoop.metrics.MetricsContext;
041    import org.apache.hadoop.metrics.MetricsException;
042    import org.apache.hadoop.metrics.MetricsRecord;
043    import org.apache.hadoop.metrics.Updater;
044    
045    /**
046     * The main class of the Service Provider Interface.  This class should be
047     * extended in order to integrate the Metrics API with a specific metrics
048     * client library. <p/>
049     *
050     * This class implements the internal table of metric data, and the timer
051     * on which data is to be sent to the metrics system.  Subclasses must
052     * override the abstract <code>emitRecord</code> method in order to transmit
053     * the data. <p/>
054     */
055    @InterfaceAudience.Public
056    @InterfaceStability.Evolving
057    public abstract class AbstractMetricsContext implements MetricsContext {
058        
059      private int period = MetricsContext.DEFAULT_PERIOD;
060      private Timer timer = null;
061        
062      private Set<Updater> updaters = new HashSet<Updater>(1);
063      private volatile boolean isMonitoring = false;
064        
065      private ContextFactory factory = null;
066      private String contextName = null;
067        
068      @InterfaceAudience.Private
069      public static class TagMap extends TreeMap<String,Object> {
070        private static final long serialVersionUID = 3546309335061952993L;
071        TagMap() {
072          super();
073        }
074        TagMap(TagMap orig) {
075          super(orig);
076        }
077        /**
078         * Returns true if this tagmap contains every tag in other.
079         */
080        public boolean containsAll(TagMap other) {
081          for (Map.Entry<String,Object> entry : other.entrySet()) {
082            Object value = get(entry.getKey());
083            if (value == null || !value.equals(entry.getValue())) {
084              // either key does not exist here, or the value is different
085              return false;
086            }
087          }
088          return true;
089        }
090      }
091      
092      @InterfaceAudience.Private
093      public static class MetricMap extends TreeMap<String,Number> {
094        private static final long serialVersionUID = -7495051861141631609L;
095        MetricMap() {
096          super();
097        }
098        MetricMap(MetricMap orig) {
099          super(orig);
100        }
101      }
102                
103      static class RecordMap extends HashMap<TagMap,MetricMap> {
104        private static final long serialVersionUID = 259835619700264611L;
105      }
106        
107      private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
108        
109    
110      /**
111       * Creates a new instance of AbstractMetricsContext
112       */
113      protected AbstractMetricsContext() {
114      }
115        
116      /**
117       * Initializes the context.
118       */
119      public void init(String contextName, ContextFactory factory) 
120      {
121        this.contextName = contextName;
122        this.factory = factory;
123      }
124        
125      /**
126       * Convenience method for subclasses to access factory attributes.
127       */
128      protected String getAttribute(String attributeName) {
129        String factoryAttribute = contextName + "." + attributeName;
130        return (String) factory.getAttribute(factoryAttribute);  
131      }
132        
133      /**
134       * Returns an attribute-value map derived from the factory attributes
135       * by finding all factory attributes that begin with 
136       * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
137       * those attributes with the contextName and tableName stripped off.
138       */
139      protected Map<String,String> getAttributeTable(String tableName) {
140        String prefix = contextName + "." + tableName + ".";
141        Map<String,String> result = new HashMap<String,String>();
142        for (String attributeName : factory.getAttributeNames()) {
143          if (attributeName.startsWith(prefix)) {
144            String name = attributeName.substring(prefix.length());
145            String value = (String) factory.getAttribute(attributeName);
146            result.put(name, value);
147          }
148        }
149        return result;
150      }
151        
152      /**
153       * Returns the context name.
154       */
155      public String getContextName() {
156        return contextName;
157      }
158        
159      /**
160       * Returns the factory by which this context was created.
161       */
162      public ContextFactory getContextFactory() {
163        return factory;
164      }
165        
166      /**
167       * Starts or restarts monitoring, the emitting of metrics records.
168       */
169      public synchronized void startMonitoring()
170        throws IOException {
171        if (!isMonitoring) {
172          startTimer();
173          isMonitoring = true;
174        }
175      }
176        
177      /**
178       * Stops monitoring.  This does not free buffered data. 
179       * @see #close()
180       */
181      public synchronized void stopMonitoring() {
182        if (isMonitoring) {
183          stopTimer();
184          isMonitoring = false;
185        }
186      }
187        
188      /**
189       * Returns true if monitoring is currently in progress.
190       */
191      public boolean isMonitoring() {
192        return isMonitoring;
193      }
194        
195      /**
196       * Stops monitoring and frees buffered data, returning this
197       * object to its initial state.  
198       */
199      public synchronized void close() {
200        stopMonitoring();
201        clearUpdaters();
202      } 
203        
204      /**
205       * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
206       * Throws an exception if the metrics implementation is configured with a fixed
207       * set of record names and <code>recordName</code> is not in that set.
208       * 
209       * @param recordName the name of the record
210       * @throws MetricsException if recordName conflicts with configuration data
211       */
212      public final synchronized MetricsRecord createRecord(String recordName) {
213        if (bufferedData.get(recordName) == null) {
214          bufferedData.put(recordName, new RecordMap());
215        }
216        return newRecord(recordName);
217      }
218        
219      /**
220       * Subclasses should override this if they subclass MetricsRecordImpl.
221       * @param recordName the name of the record
222       * @return newly created instance of MetricsRecordImpl or subclass
223       */
224      protected MetricsRecord newRecord(String recordName) {
225        return new MetricsRecordImpl(recordName, this);
226      }
227        
228      /**
229       * Registers a callback to be called at time intervals determined by
230       * the configuration.
231       *
232       * @param updater object to be run periodically; it should update
233       * some metrics records 
234       */
235      public synchronized void registerUpdater(final Updater updater) {
236        if (!updaters.contains(updater)) {
237          updaters.add(updater);
238        }
239      }
240        
241      /**
242       * Removes a callback, if it exists.
243       *
244       * @param updater object to be removed from the callback list
245       */
246      public synchronized void unregisterUpdater(Updater updater) {
247        updaters.remove(updater);
248      }
249        
250      private synchronized void clearUpdaters() {
251        updaters.clear();
252      }
253        
254      /**
255       * Starts timer if it is not already started
256       */
257      private synchronized void startTimer() {
258        if (timer == null) {
259          timer = new Timer("Timer thread for monitoring " + getContextName(), 
260                            true);
261          TimerTask task = new TimerTask() {
262              public void run() {
263                try {
264                  timerEvent();
265                }
266                catch (IOException ioe) {
267                  ioe.printStackTrace();
268                }
269              }
270            };
271          long millis = period * 1000;
272          timer.scheduleAtFixedRate(task, millis, millis);
273        }
274      }
275        
276      /**
277       * Stops timer if it is running
278       */
279      private synchronized void stopTimer() {
280        if (timer != null) {
281          timer.cancel();
282          timer = null;
283        }
284      }
285        
286      /**
287       * Timer callback.
288       */
289      private void timerEvent() throws IOException {
290        if (isMonitoring) {
291          Collection<Updater> myUpdaters;
292          synchronized (this) {
293            myUpdaters = new ArrayList<Updater>(updaters);
294          }     
295          // Run all the registered updates without holding a lock
296          // on this context
297          for (Updater updater : myUpdaters) {
298            try {
299              updater.doUpdates(this);
300            }
301            catch (Throwable throwable) {
302              throwable.printStackTrace();
303            }
304          }
305          emitRecords();
306        }
307      }
308        
309      /**
310       *  Emits the records.
311       */
312      private synchronized void emitRecords() throws IOException {
313        for (String recordName : bufferedData.keySet()) {
314          RecordMap recordMap = bufferedData.get(recordName);
315          synchronized (recordMap) {
316            Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
317            for (Entry<TagMap, MetricMap> entry : entrySet) {
318              OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
319              emitRecord(contextName, recordName, outRec);
320            }
321          }
322        }
323        flush();
324      }
325      
326      /**
327       * Retrieves all the records managed by this MetricsContext.
328       * Useful for monitoring systems that are polling-based.
329       * @return A non-null collection of all monitoring records.
330       */
331      public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
332        Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
333        for (String recordName : bufferedData.keySet()) {
334          RecordMap recordMap = bufferedData.get(recordName);
335          synchronized (recordMap) {
336            List<OutputRecord> records = new ArrayList<OutputRecord>();
337            Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
338            for (Entry<TagMap, MetricMap> entry : entrySet) {
339              OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
340              records.add(outRec);
341            }
342            out.put(recordName, records);
343          }
344        }
345        return out;
346      }
347    
348      /**
349       * Sends a record to the metrics system.
350       */
351      protected abstract void emitRecord(String contextName, String recordName, 
352                                         OutputRecord outRec) throws IOException;
353        
354      /**
355       * Called each period after all records have been emitted, this method does nothing.
356       * Subclasses may override it in order to perform some kind of flush.
357       */
358      protected void flush() throws IOException {
359      }
360        
361      /**
362       * Called by MetricsRecordImpl.update().  Creates or updates a row in
363       * the internal table of metric data.
364       */
365      protected void update(MetricsRecordImpl record) {
366        String recordName = record.getRecordName();
367        TagMap tagTable = record.getTagTable();
368        Map<String,MetricValue> metricUpdates = record.getMetricTable();
369            
370        RecordMap recordMap = getRecordMap(recordName);
371        synchronized (recordMap) {
372          MetricMap metricMap = recordMap.get(tagTable);
373          if (metricMap == null) {
374            metricMap = new MetricMap();
375            TagMap tagMap = new TagMap(tagTable); // clone tags
376            recordMap.put(tagMap, metricMap);
377          }
378    
379          Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
380          for (Entry<String, MetricValue> entry : entrySet) {
381            String metricName = entry.getKey ();
382            MetricValue updateValue = entry.getValue ();
383            Number updateNumber = updateValue.getNumber();
384            Number currentNumber = metricMap.get(metricName);
385            if (currentNumber == null || updateValue.isAbsolute()) {
386              metricMap.put(metricName, updateNumber);
387            }
388            else {
389              Number newNumber = sum(updateNumber, currentNumber);
390              metricMap.put(metricName, newNumber);
391            }
392          }
393        }
394      }
395        
396      private synchronized RecordMap getRecordMap(String recordName) {
397        return bufferedData.get(recordName);
398      }
399        
400      /**
401       * Adds two numbers, coercing the second to the type of the first.
402       *
403       */
404      private Number sum(Number a, Number b) {
405        if (a instanceof Integer) {
406          return Integer.valueOf(a.intValue() + b.intValue());
407        }
408        else if (a instanceof Float) {
409          return new Float(a.floatValue() + b.floatValue());
410        }
411        else if (a instanceof Short) {
412          return Short.valueOf((short)(a.shortValue() + b.shortValue()));
413        }
414        else if (a instanceof Byte) {
415          return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
416        }
417        else if (a instanceof Long) {
418          return Long.valueOf((a.longValue() + b.longValue()));
419        }
420        else {
421          // should never happen
422          throw new MetricsException("Invalid number type");
423        }
424                
425      }
426        
427      /**
428       * Called by MetricsRecordImpl.remove().  Removes all matching rows in
429       * the internal table of metric data.  A row matches if it has the same
430       * tag names and values as record, but it may also have additional
431       * tags.
432       */    
433      protected void remove(MetricsRecordImpl record) {
434        String recordName = record.getRecordName();
435        TagMap tagTable = record.getTagTable();
436            
437        RecordMap recordMap = getRecordMap(recordName);
438        synchronized (recordMap) {
439          Iterator<TagMap> it = recordMap.keySet().iterator();
440          while (it.hasNext()) {
441            TagMap rowTags = it.next();
442            if (rowTags.containsAll(tagTable)) {
443              it.remove();
444            }
445          }
446        }
447      }
448        
449      /**
450       * Returns the timer period.
451       */
452      public int getPeriod() {
453        return period;
454      }
455        
456      /**
457       * Sets the timer period
458       */
459      protected void setPeriod(int period) {
460        this.period = period;
461      }
462      
463      /**
464       * If a period is set in the attribute passed in, override
465       * the default with it.
466       */
467      protected void parseAndSetPeriod(String attributeName) {
468        String periodStr = getAttribute(attributeName);
469        if (periodStr != null) {
470          int period = 0;
471          try {
472            period = Integer.parseInt(periodStr);
473          } catch (NumberFormatException nfe) {
474          }
475          if (period <= 0) {
476            throw new MetricsException("Invalid period: " + periodStr);
477          }
478          setPeriod(period);
479        }
480      }
481    }