001/*
002 * AbstractMetricsContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.spi;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.Timer;
033import java.util.TimerTask;
034import java.util.TreeMap;
035import java.util.Map.Entry;
036
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.metrics.ContextFactory;
040import org.apache.hadoop.metrics.MetricsContext;
041import org.apache.hadoop.metrics.MetricsException;
042import org.apache.hadoop.metrics.MetricsRecord;
043import org.apache.hadoop.metrics.Updater;
044
045/**
046 * The main class of the Service Provider Interface.  This class should be
047 * extended in order to integrate the Metrics API with a specific metrics
048 * client library. <p/>
049 *
050 * This class implements the internal table of metric data, and the timer
051 * on which data is to be sent to the metrics system.  Subclasses must
052 * override the abstract <code>emitRecord</code> method in order to transmit
053 * the data. <p/>
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Evolving
057public abstract class AbstractMetricsContext implements MetricsContext {
058    
059  private int period = MetricsContext.DEFAULT_PERIOD;
060  private Timer timer = null;
061    
062  private Set<Updater> updaters = new HashSet<Updater>(1);
063  private volatile boolean isMonitoring = false;
064    
065  private ContextFactory factory = null;
066  private String contextName = null;
067    
068  @InterfaceAudience.Private
069  public static class TagMap extends TreeMap<String,Object> {
070    private static final long serialVersionUID = 3546309335061952993L;
071    TagMap() {
072      super();
073    }
074    TagMap(TagMap orig) {
075      super(orig);
076    }
077    /**
078     * Returns true if this tagmap contains every tag in other.
079     */
080    public boolean containsAll(TagMap other) {
081      for (Map.Entry<String,Object> entry : other.entrySet()) {
082        Object value = get(entry.getKey());
083        if (value == null || !value.equals(entry.getValue())) {
084          // either key does not exist here, or the value is different
085          return false;
086        }
087      }
088      return true;
089    }
090  }
091  
092  @InterfaceAudience.Private
093  public static class MetricMap extends TreeMap<String,Number> {
094    private static final long serialVersionUID = -7495051861141631609L;
095    MetricMap() {
096      super();
097    }
098    MetricMap(MetricMap orig) {
099      super(orig);
100    }
101  }
102            
103  static class RecordMap extends HashMap<TagMap,MetricMap> {
104    private static final long serialVersionUID = 259835619700264611L;
105  }
106    
107  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
108    
109
110  /**
111   * Creates a new instance of AbstractMetricsContext
112   */
113  protected AbstractMetricsContext() {
114  }
115    
116  /**
117   * Initializes the context.
118   */
119  public void init(String contextName, ContextFactory factory) 
120  {
121    this.contextName = contextName;
122    this.factory = factory;
123  }
124    
125  /**
126   * Convenience method for subclasses to access factory attributes.
127   */
128  protected String getAttribute(String attributeName) {
129    String factoryAttribute = contextName + "." + attributeName;
130    return (String) factory.getAttribute(factoryAttribute);  
131  }
132    
133  /**
134   * Returns an attribute-value map derived from the factory attributes
135   * by finding all factory attributes that begin with 
136   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
137   * those attributes with the contextName and tableName stripped off.
138   */
139  protected Map<String,String> getAttributeTable(String tableName) {
140    String prefix = contextName + "." + tableName + ".";
141    Map<String,String> result = new HashMap<String,String>();
142    for (String attributeName : factory.getAttributeNames()) {
143      if (attributeName.startsWith(prefix)) {
144        String name = attributeName.substring(prefix.length());
145        String value = (String) factory.getAttribute(attributeName);
146        result.put(name, value);
147      }
148    }
149    return result;
150  }
151    
152  /**
153   * Returns the context name.
154   */
155  public String getContextName() {
156    return contextName;
157  }
158    
159  /**
160   * Returns the factory by which this context was created.
161   */
162  public ContextFactory getContextFactory() {
163    return factory;
164  }
165    
166  /**
167   * Starts or restarts monitoring, the emitting of metrics records.
168   */
169  public synchronized void startMonitoring()
170    throws IOException {
171    if (!isMonitoring) {
172      startTimer();
173      isMonitoring = true;
174    }
175  }
176    
177  /**
178   * Stops monitoring.  This does not free buffered data. 
179   * @see #close()
180   */
181  public synchronized void stopMonitoring() {
182    if (isMonitoring) {
183      stopTimer();
184      isMonitoring = false;
185    }
186  }
187    
188  /**
189   * Returns true if monitoring is currently in progress.
190   */
191  public boolean isMonitoring() {
192    return isMonitoring;
193  }
194    
195  /**
196   * Stops monitoring and frees buffered data, returning this
197   * object to its initial state.  
198   */
199  public synchronized void close() {
200    stopMonitoring();
201    clearUpdaters();
202  } 
203    
204  /**
205   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
206   * Throws an exception if the metrics implementation is configured with a fixed
207   * set of record names and <code>recordName</code> is not in that set.
208   * 
209   * @param recordName the name of the record
210   * @throws MetricsException if recordName conflicts with configuration data
211   */
212  public final synchronized MetricsRecord createRecord(String recordName) {
213    if (bufferedData.get(recordName) == null) {
214      bufferedData.put(recordName, new RecordMap());
215    }
216    return newRecord(recordName);
217  }
218    
219  /**
220   * Subclasses should override this if they subclass MetricsRecordImpl.
221   * @param recordName the name of the record
222   * @return newly created instance of MetricsRecordImpl or subclass
223   */
224  protected MetricsRecord newRecord(String recordName) {
225    return new MetricsRecordImpl(recordName, this);
226  }
227    
228  /**
229   * Registers a callback to be called at time intervals determined by
230   * the configuration.
231   *
232   * @param updater object to be run periodically; it should update
233   * some metrics records 
234   */
235  public synchronized void registerUpdater(final Updater updater) {
236    if (!updaters.contains(updater)) {
237      updaters.add(updater);
238    }
239  }
240    
241  /**
242   * Removes a callback, if it exists.
243   *
244   * @param updater object to be removed from the callback list
245   */
246  public synchronized void unregisterUpdater(Updater updater) {
247    updaters.remove(updater);
248  }
249    
250  private synchronized void clearUpdaters() {
251    updaters.clear();
252  }
253    
254  /**
255   * Starts timer if it is not already started
256   */
257  private synchronized void startTimer() {
258    if (timer == null) {
259      timer = new Timer("Timer thread for monitoring " + getContextName(), 
260                        true);
261      TimerTask task = new TimerTask() {
262          public void run() {
263            try {
264              timerEvent();
265            }
266            catch (IOException ioe) {
267              ioe.printStackTrace();
268            }
269          }
270        };
271      long millis = period * 1000;
272      timer.scheduleAtFixedRate(task, millis, millis);
273    }
274  }
275    
276  /**
277   * Stops timer if it is running
278   */
279  private synchronized void stopTimer() {
280    if (timer != null) {
281      timer.cancel();
282      timer = null;
283    }
284  }
285    
286  /**
287   * Timer callback.
288   */
289  private void timerEvent() throws IOException {
290    if (isMonitoring) {
291      Collection<Updater> myUpdaters;
292      synchronized (this) {
293        myUpdaters = new ArrayList<Updater>(updaters);
294      }     
295      // Run all the registered updates without holding a lock
296      // on this context
297      for (Updater updater : myUpdaters) {
298        try {
299          updater.doUpdates(this);
300        }
301        catch (Throwable throwable) {
302          throwable.printStackTrace();
303        }
304      }
305      emitRecords();
306    }
307  }
308    
309  /**
310   *  Emits the records.
311   */
312  private synchronized void emitRecords() throws IOException {
313    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
314      RecordMap recordMap = recordEntry.getValue();
315      synchronized (recordMap) {
316        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
317        for (Entry<TagMap, MetricMap> entry : entrySet) {
318          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
319          emitRecord(contextName, recordEntry.getKey(), outRec);
320        }
321      }
322    }
323    flush();
324  }
325  
326  /**
327   * Retrieves all the records managed by this MetricsContext.
328   * Useful for monitoring systems that are polling-based.
329   * @return A non-null collection of all monitoring records.
330   */
331  public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
332    Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
333    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
334      RecordMap recordMap = recordEntry.getValue();
335      synchronized (recordMap) {
336        List<OutputRecord> records = new ArrayList<OutputRecord>();
337        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
338        for (Entry<TagMap, MetricMap> entry : entrySet) {
339          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
340          records.add(outRec);
341        }
342        out.put(recordEntry.getKey(), records);
343      }
344    }
345    return out;
346  }
347
348  /**
349   * Sends a record to the metrics system.
350   */
351  protected abstract void emitRecord(String contextName, String recordName, 
352                                     OutputRecord outRec) throws IOException;
353    
354  /**
355   * Called each period after all records have been emitted, this method does nothing.
356   * Subclasses may override it in order to perform some kind of flush.
357   */
358  protected void flush() throws IOException {
359  }
360    
361  /**
362   * Called by MetricsRecordImpl.update().  Creates or updates a row in
363   * the internal table of metric data.
364   */
365  protected void update(MetricsRecordImpl record) {
366    String recordName = record.getRecordName();
367    TagMap tagTable = record.getTagTable();
368    Map<String,MetricValue> metricUpdates = record.getMetricTable();
369        
370    RecordMap recordMap = getRecordMap(recordName);
371    synchronized (recordMap) {
372      MetricMap metricMap = recordMap.get(tagTable);
373      if (metricMap == null) {
374        metricMap = new MetricMap();
375        TagMap tagMap = new TagMap(tagTable); // clone tags
376        recordMap.put(tagMap, metricMap);
377      }
378
379      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
380      for (Entry<String, MetricValue> entry : entrySet) {
381        String metricName = entry.getKey ();
382        MetricValue updateValue = entry.getValue ();
383        Number updateNumber = updateValue.getNumber();
384        Number currentNumber = metricMap.get(metricName);
385        if (currentNumber == null || updateValue.isAbsolute()) {
386          metricMap.put(metricName, updateNumber);
387        }
388        else {
389          Number newNumber = sum(updateNumber, currentNumber);
390          metricMap.put(metricName, newNumber);
391        }
392      }
393    }
394  }
395    
396  private synchronized RecordMap getRecordMap(String recordName) {
397    return bufferedData.get(recordName);
398  }
399    
400  /**
401   * Adds two numbers, coercing the second to the type of the first.
402   *
403   */
404  private Number sum(Number a, Number b) {
405    if (a instanceof Integer) {
406      return Integer.valueOf(a.intValue() + b.intValue());
407    }
408    else if (a instanceof Float) {
409      return new Float(a.floatValue() + b.floatValue());
410    }
411    else if (a instanceof Short) {
412      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
413    }
414    else if (a instanceof Byte) {
415      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
416    }
417    else if (a instanceof Long) {
418      return Long.valueOf((a.longValue() + b.longValue()));
419    }
420    else {
421      // should never happen
422      throw new MetricsException("Invalid number type");
423    }
424            
425  }
426    
427  /**
428   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
429   * the internal table of metric data.  A row matches if it has the same
430   * tag names and values as record, but it may also have additional
431   * tags.
432   */    
433  protected void remove(MetricsRecordImpl record) {
434    String recordName = record.getRecordName();
435    TagMap tagTable = record.getTagTable();
436        
437    RecordMap recordMap = getRecordMap(recordName);
438    synchronized (recordMap) {
439      Iterator<TagMap> it = recordMap.keySet().iterator();
440      while (it.hasNext()) {
441        TagMap rowTags = it.next();
442        if (rowTags.containsAll(tagTable)) {
443          it.remove();
444        }
445      }
446    }
447  }
448    
449  /**
450   * Returns the timer period.
451   */
452  public int getPeriod() {
453    return period;
454  }
455    
456  /**
457   * Sets the timer period
458   */
459  protected void setPeriod(int period) {
460    this.period = period;
461  }
462  
463  /**
464   * If a period is set in the attribute passed in, override
465   * the default with it.
466   */
467  protected void parseAndSetPeriod(String attributeName) {
468    String periodStr = getAttribute(attributeName);
469    if (periodStr != null) {
470      int period = 0;
471      try {
472        period = Integer.parseInt(periodStr);
473      } catch (NumberFormatException nfe) {
474      }
475      if (period <= 0) {
476        throw new MetricsException("Invalid period: " + periodStr);
477      }
478      setPeriod(period);
479    }
480  }
481}