001/*
002 * AbstractMetricsContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.spi;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.Timer;
033import java.util.TimerTask;
034import java.util.TreeMap;
035import java.util.Map.Entry;
036
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.metrics.ContextFactory;
040import org.apache.hadoop.metrics.MetricsContext;
041import org.apache.hadoop.metrics.MetricsException;
042import org.apache.hadoop.metrics.MetricsRecord;
043import org.apache.hadoop.metrics.Updater;
044
045/**
046 * The main class of the Service Provider Interface.  This class should be
047 * extended in order to integrate the Metrics API with a specific metrics
048 * client library. <p/>
049 *
050 * This class implements the internal table of metric data, and the timer
051 * on which data is to be sent to the metrics system.  Subclasses must
052 * override the abstract <code>emitRecord</code> method in order to transmit
053 * the data. <p/>
054 *
055 * @deprecated Use org.apache.hadoop.metrics2 package instead.
056 */
057@Deprecated
058@InterfaceAudience.Public
059@InterfaceStability.Evolving
060public abstract class AbstractMetricsContext implements MetricsContext {
061    
062  private int period = MetricsContext.DEFAULT_PERIOD;
063  private Timer timer = null;
064    
065  private Set<Updater> updaters = new HashSet<Updater>(1);
066  private volatile boolean isMonitoring = false;
067    
068  private ContextFactory factory = null;
069  private String contextName = null;
070    
071  @InterfaceAudience.Private
072  public static class TagMap extends TreeMap<String,Object> {
073    private static final long serialVersionUID = 3546309335061952993L;
074    TagMap() {
075      super();
076    }
077    TagMap(TagMap orig) {
078      super(orig);
079    }
080    /**
081     * Returns true if this tagmap contains every tag in other.
082     */
083    public boolean containsAll(TagMap other) {
084      for (Map.Entry<String,Object> entry : other.entrySet()) {
085        Object value = get(entry.getKey());
086        if (value == null || !value.equals(entry.getValue())) {
087          // either key does not exist here, or the value is different
088          return false;
089        }
090      }
091      return true;
092    }
093  }
094  
095  @InterfaceAudience.Private
096  public static class MetricMap extends TreeMap<String,Number> {
097    private static final long serialVersionUID = -7495051861141631609L;
098    MetricMap() {
099      super();
100    }
101    MetricMap(MetricMap orig) {
102      super(orig);
103    }
104  }
105            
106  static class RecordMap extends HashMap<TagMap,MetricMap> {
107    private static final long serialVersionUID = 259835619700264611L;
108  }
109    
110  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
111    
112
113  /**
114   * Creates a new instance of AbstractMetricsContext
115   */
116  protected AbstractMetricsContext() {
117  }
118    
119  /**
120   * Initializes the context.
121   */
122  public void init(String contextName, ContextFactory factory) 
123  {
124    this.contextName = contextName;
125    this.factory = factory;
126  }
127    
128  /**
129   * Convenience method for subclasses to access factory attributes.
130   */
131  protected String getAttribute(String attributeName) {
132    String factoryAttribute = contextName + "." + attributeName;
133    return (String) factory.getAttribute(factoryAttribute);  
134  }
135    
136  /**
137   * Returns an attribute-value map derived from the factory attributes
138   * by finding all factory attributes that begin with 
139   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
140   * those attributes with the contextName and tableName stripped off.
141   */
142  protected Map<String,String> getAttributeTable(String tableName) {
143    String prefix = contextName + "." + tableName + ".";
144    Map<String,String> result = new HashMap<String,String>();
145    for (String attributeName : factory.getAttributeNames()) {
146      if (attributeName.startsWith(prefix)) {
147        String name = attributeName.substring(prefix.length());
148        String value = (String) factory.getAttribute(attributeName);
149        result.put(name, value);
150      }
151    }
152    return result;
153  }
154    
155  /**
156   * Returns the context name.
157   */
158  public String getContextName() {
159    return contextName;
160  }
161    
162  /**
163   * Returns the factory by which this context was created.
164   */
165  public ContextFactory getContextFactory() {
166    return factory;
167  }
168    
169  /**
170   * Starts or restarts monitoring, the emitting of metrics records.
171   */
172  public synchronized void startMonitoring()
173    throws IOException {
174    if (!isMonitoring) {
175      startTimer();
176      isMonitoring = true;
177    }
178  }
179    
180  /**
181   * Stops monitoring.  This does not free buffered data. 
182   * @see #close()
183   */
184  public synchronized void stopMonitoring() {
185    if (isMonitoring) {
186      stopTimer();
187      isMonitoring = false;
188    }
189  }
190    
191  /**
192   * Returns true if monitoring is currently in progress.
193   */
194  public boolean isMonitoring() {
195    return isMonitoring;
196  }
197    
198  /**
199   * Stops monitoring and frees buffered data, returning this
200   * object to its initial state.  
201   */
202  public synchronized void close() {
203    stopMonitoring();
204    clearUpdaters();
205  } 
206    
207  /**
208   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
209   * Throws an exception if the metrics implementation is configured with a fixed
210   * set of record names and <code>recordName</code> is not in that set.
211   * 
212   * @param recordName the name of the record
213   * @throws MetricsException if recordName conflicts with configuration data
214   */
215  public final synchronized MetricsRecord createRecord(String recordName) {
216    if (bufferedData.get(recordName) == null) {
217      bufferedData.put(recordName, new RecordMap());
218    }
219    return newRecord(recordName);
220  }
221    
222  /**
223   * Subclasses should override this if they subclass MetricsRecordImpl.
224   * @param recordName the name of the record
225   * @return newly created instance of MetricsRecordImpl or subclass
226   */
227  protected MetricsRecord newRecord(String recordName) {
228    return new MetricsRecordImpl(recordName, this);
229  }
230    
231  /**
232   * Registers a callback to be called at time intervals determined by
233   * the configuration.
234   *
235   * @param updater object to be run periodically; it should update
236   * some metrics records 
237   */
238  public synchronized void registerUpdater(final Updater updater) {
239    if (!updaters.contains(updater)) {
240      updaters.add(updater);
241    }
242  }
243    
244  /**
245   * Removes a callback, if it exists.
246   *
247   * @param updater object to be removed from the callback list
248   */
249  public synchronized void unregisterUpdater(Updater updater) {
250    updaters.remove(updater);
251  }
252    
253  private synchronized void clearUpdaters() {
254    updaters.clear();
255  }
256    
257  /**
258   * Starts timer if it is not already started
259   */
260  private synchronized void startTimer() {
261    if (timer == null) {
262      timer = new Timer("Timer thread for monitoring " + getContextName(), 
263                        true);
264      TimerTask task = new TimerTask() {
265          public void run() {
266            try {
267              timerEvent();
268            }
269            catch (IOException ioe) {
270              ioe.printStackTrace();
271            }
272          }
273        };
274      long millis = period * 1000;
275      timer.scheduleAtFixedRate(task, millis, millis);
276    }
277  }
278    
279  /**
280   * Stops timer if it is running
281   */
282  private synchronized void stopTimer() {
283    if (timer != null) {
284      timer.cancel();
285      timer = null;
286    }
287  }
288    
289  /**
290   * Timer callback.
291   */
292  private void timerEvent() throws IOException {
293    if (isMonitoring) {
294      Collection<Updater> myUpdaters;
295      synchronized (this) {
296        myUpdaters = new ArrayList<Updater>(updaters);
297      }     
298      // Run all the registered updates without holding a lock
299      // on this context
300      for (Updater updater : myUpdaters) {
301        try {
302          updater.doUpdates(this);
303        }
304        catch (Throwable throwable) {
305          throwable.printStackTrace();
306        }
307      }
308      emitRecords();
309    }
310  }
311    
312  /**
313   *  Emits the records.
314   */
315  private synchronized void emitRecords() throws IOException {
316    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
317      RecordMap recordMap = recordEntry.getValue();
318      synchronized (recordMap) {
319        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
320        for (Entry<TagMap, MetricMap> entry : entrySet) {
321          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
322          emitRecord(contextName, recordEntry.getKey(), outRec);
323        }
324      }
325    }
326    flush();
327  }
328  
329  /**
330   * Retrieves all the records managed by this MetricsContext.
331   * Useful for monitoring systems that are polling-based.
332   * @return A non-null collection of all monitoring records.
333   */
334  public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
335    Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
336    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
337      RecordMap recordMap = recordEntry.getValue();
338      synchronized (recordMap) {
339        List<OutputRecord> records = new ArrayList<OutputRecord>();
340        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
341        for (Entry<TagMap, MetricMap> entry : entrySet) {
342          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
343          records.add(outRec);
344        }
345        out.put(recordEntry.getKey(), records);
346      }
347    }
348    return out;
349  }
350
351  /**
352   * Sends a record to the metrics system.
353   */
354  protected abstract void emitRecord(String contextName, String recordName, 
355                                     OutputRecord outRec) throws IOException;
356    
357  /**
358   * Called each period after all records have been emitted, this method does nothing.
359   * Subclasses may override it in order to perform some kind of flush.
360   */
361  protected void flush() throws IOException {
362  }
363    
364  /**
365   * Called by MetricsRecordImpl.update().  Creates or updates a row in
366   * the internal table of metric data.
367   */
368  protected void update(MetricsRecordImpl record) {
369    String recordName = record.getRecordName();
370    TagMap tagTable = record.getTagTable();
371    Map<String,MetricValue> metricUpdates = record.getMetricTable();
372        
373    RecordMap recordMap = getRecordMap(recordName);
374    synchronized (recordMap) {
375      MetricMap metricMap = recordMap.get(tagTable);
376      if (metricMap == null) {
377        metricMap = new MetricMap();
378        TagMap tagMap = new TagMap(tagTable); // clone tags
379        recordMap.put(tagMap, metricMap);
380      }
381
382      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
383      for (Entry<String, MetricValue> entry : entrySet) {
384        String metricName = entry.getKey ();
385        MetricValue updateValue = entry.getValue ();
386        Number updateNumber = updateValue.getNumber();
387        Number currentNumber = metricMap.get(metricName);
388        if (currentNumber == null || updateValue.isAbsolute()) {
389          metricMap.put(metricName, updateNumber);
390        }
391        else {
392          Number newNumber = sum(updateNumber, currentNumber);
393          metricMap.put(metricName, newNumber);
394        }
395      }
396    }
397  }
398    
399  private synchronized RecordMap getRecordMap(String recordName) {
400    return bufferedData.get(recordName);
401  }
402    
403  /**
404   * Adds two numbers, coercing the second to the type of the first.
405   *
406   */
407  private Number sum(Number a, Number b) {
408    if (a instanceof Integer) {
409      return Integer.valueOf(a.intValue() + b.intValue());
410    }
411    else if (a instanceof Float) {
412      return new Float(a.floatValue() + b.floatValue());
413    }
414    else if (a instanceof Short) {
415      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
416    }
417    else if (a instanceof Byte) {
418      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
419    }
420    else if (a instanceof Long) {
421      return Long.valueOf((a.longValue() + b.longValue()));
422    }
423    else {
424      // should never happen
425      throw new MetricsException("Invalid number type");
426    }
427            
428  }
429    
430  /**
431   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
432   * the internal table of metric data.  A row matches if it has the same
433   * tag names and values as record, but it may also have additional
434   * tags.
435   */    
436  protected void remove(MetricsRecordImpl record) {
437    String recordName = record.getRecordName();
438    TagMap tagTable = record.getTagTable();
439        
440    RecordMap recordMap = getRecordMap(recordName);
441    synchronized (recordMap) {
442      Iterator<TagMap> it = recordMap.keySet().iterator();
443      while (it.hasNext()) {
444        TagMap rowTags = it.next();
445        if (rowTags.containsAll(tagTable)) {
446          it.remove();
447        }
448      }
449    }
450  }
451    
452  /**
453   * Returns the timer period.
454   */
455  public int getPeriod() {
456    return period;
457  }
458    
459  /**
460   * Sets the timer period
461   */
462  protected void setPeriod(int period) {
463    this.period = period;
464  }
465  
466  /**
467   * If a period is set in the attribute passed in, override
468   * the default with it.
469   */
470  protected void parseAndSetPeriod(String attributeName) {
471    String periodStr = getAttribute(attributeName);
472    if (periodStr != null) {
473      int period = 0;
474      try {
475        period = Integer.parseInt(periodStr);
476      } catch (NumberFormatException nfe) {
477      }
478      if (period <= 0) {
479        throw new MetricsException("Invalid period: " + periodStr);
480      }
481      setPeriod(period);
482    }
483  }
484}