001/* 002 * AbstractMetricsContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.spi; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.Timer; 033import java.util.TimerTask; 034import java.util.TreeMap; 035import java.util.Map.Entry; 036 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.metrics.ContextFactory; 040import org.apache.hadoop.metrics.MetricsContext; 041import org.apache.hadoop.metrics.MetricsException; 042import org.apache.hadoop.metrics.MetricsRecord; 043import org.apache.hadoop.metrics.Updater; 044 045/** 046 * The main class of the Service Provider Interface. This class should be 047 * extended in order to integrate the Metrics API with a specific metrics 048 * client library. <p/> 049 * 050 * This class implements the internal table of metric data, and the timer 051 * on which data is to be sent to the metrics system. Subclasses must 052 * override the abstract <code>emitRecord</code> method in order to transmit 053 * the data. <p/> 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Evolving 057public abstract class AbstractMetricsContext implements MetricsContext { 058 059 private int period = MetricsContext.DEFAULT_PERIOD; 060 private Timer timer = null; 061 062 private Set<Updater> updaters = new HashSet<Updater>(1); 063 private volatile boolean isMonitoring = false; 064 065 private ContextFactory factory = null; 066 private String contextName = null; 067 068 @InterfaceAudience.Private 069 public static class TagMap extends TreeMap<String,Object> { 070 private static final long serialVersionUID = 3546309335061952993L; 071 TagMap() { 072 super(); 073 } 074 TagMap(TagMap orig) { 075 super(orig); 076 } 077 /** 078 * Returns true if this tagmap contains every tag in other. 079 */ 080 public boolean containsAll(TagMap other) { 081 for (Map.Entry<String,Object> entry : other.entrySet()) { 082 Object value = get(entry.getKey()); 083 if (value == null || !value.equals(entry.getValue())) { 084 // either key does not exist here, or the value is different 085 return false; 086 } 087 } 088 return true; 089 } 090 } 091 092 @InterfaceAudience.Private 093 public static class MetricMap extends TreeMap<String,Number> { 094 private static final long serialVersionUID = -7495051861141631609L; 095 MetricMap() { 096 super(); 097 } 098 MetricMap(MetricMap orig) { 099 super(orig); 100 } 101 } 102 103 static class RecordMap extends HashMap<TagMap,MetricMap> { 104 private static final long serialVersionUID = 259835619700264611L; 105 } 106 107 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>(); 108 109 110 /** 111 * Creates a new instance of AbstractMetricsContext 112 */ 113 protected AbstractMetricsContext() { 114 } 115 116 /** 117 * Initializes the context. 118 */ 119 public void init(String contextName, ContextFactory factory) 120 { 121 this.contextName = contextName; 122 this.factory = factory; 123 } 124 125 /** 126 * Convenience method for subclasses to access factory attributes. 127 */ 128 protected String getAttribute(String attributeName) { 129 String factoryAttribute = contextName + "." + attributeName; 130 return (String) factory.getAttribute(factoryAttribute); 131 } 132 133 /** 134 * Returns an attribute-value map derived from the factory attributes 135 * by finding all factory attributes that begin with 136 * <i>contextName</i>.<i>tableName</i>. The returned map consists of 137 * those attributes with the contextName and tableName stripped off. 138 */ 139 protected Map<String,String> getAttributeTable(String tableName) { 140 String prefix = contextName + "." + tableName + "."; 141 Map<String,String> result = new HashMap<String,String>(); 142 for (String attributeName : factory.getAttributeNames()) { 143 if (attributeName.startsWith(prefix)) { 144 String name = attributeName.substring(prefix.length()); 145 String value = (String) factory.getAttribute(attributeName); 146 result.put(name, value); 147 } 148 } 149 return result; 150 } 151 152 /** 153 * Returns the context name. 154 */ 155 public String getContextName() { 156 return contextName; 157 } 158 159 /** 160 * Returns the factory by which this context was created. 161 */ 162 public ContextFactory getContextFactory() { 163 return factory; 164 } 165 166 /** 167 * Starts or restarts monitoring, the emitting of metrics records. 168 */ 169 public synchronized void startMonitoring() 170 throws IOException { 171 if (!isMonitoring) { 172 startTimer(); 173 isMonitoring = true; 174 } 175 } 176 177 /** 178 * Stops monitoring. This does not free buffered data. 179 * @see #close() 180 */ 181 public synchronized void stopMonitoring() { 182 if (isMonitoring) { 183 stopTimer(); 184 isMonitoring = false; 185 } 186 } 187 188 /** 189 * Returns true if monitoring is currently in progress. 190 */ 191 public boolean isMonitoring() { 192 return isMonitoring; 193 } 194 195 /** 196 * Stops monitoring and frees buffered data, returning this 197 * object to its initial state. 198 */ 199 public synchronized void close() { 200 stopMonitoring(); 201 clearUpdaters(); 202 } 203 204 /** 205 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>. 206 * Throws an exception if the metrics implementation is configured with a fixed 207 * set of record names and <code>recordName</code> is not in that set. 208 * 209 * @param recordName the name of the record 210 * @throws MetricsException if recordName conflicts with configuration data 211 */ 212 public final synchronized MetricsRecord createRecord(String recordName) { 213 if (bufferedData.get(recordName) == null) { 214 bufferedData.put(recordName, new RecordMap()); 215 } 216 return newRecord(recordName); 217 } 218 219 /** 220 * Subclasses should override this if they subclass MetricsRecordImpl. 221 * @param recordName the name of the record 222 * @return newly created instance of MetricsRecordImpl or subclass 223 */ 224 protected MetricsRecord newRecord(String recordName) { 225 return new MetricsRecordImpl(recordName, this); 226 } 227 228 /** 229 * Registers a callback to be called at time intervals determined by 230 * the configuration. 231 * 232 * @param updater object to be run periodically; it should update 233 * some metrics records 234 */ 235 public synchronized void registerUpdater(final Updater updater) { 236 if (!updaters.contains(updater)) { 237 updaters.add(updater); 238 } 239 } 240 241 /** 242 * Removes a callback, if it exists. 243 * 244 * @param updater object to be removed from the callback list 245 */ 246 public synchronized void unregisterUpdater(Updater updater) { 247 updaters.remove(updater); 248 } 249 250 private synchronized void clearUpdaters() { 251 updaters.clear(); 252 } 253 254 /** 255 * Starts timer if it is not already started 256 */ 257 private synchronized void startTimer() { 258 if (timer == null) { 259 timer = new Timer("Timer thread for monitoring " + getContextName(), 260 true); 261 TimerTask task = new TimerTask() { 262 public void run() { 263 try { 264 timerEvent(); 265 } 266 catch (IOException ioe) { 267 ioe.printStackTrace(); 268 } 269 } 270 }; 271 long millis = period * 1000; 272 timer.scheduleAtFixedRate(task, millis, millis); 273 } 274 } 275 276 /** 277 * Stops timer if it is running 278 */ 279 private synchronized void stopTimer() { 280 if (timer != null) { 281 timer.cancel(); 282 timer = null; 283 } 284 } 285 286 /** 287 * Timer callback. 288 */ 289 private void timerEvent() throws IOException { 290 if (isMonitoring) { 291 Collection<Updater> myUpdaters; 292 synchronized (this) { 293 myUpdaters = new ArrayList<Updater>(updaters); 294 } 295 // Run all the registered updates without holding a lock 296 // on this context 297 for (Updater updater : myUpdaters) { 298 try { 299 updater.doUpdates(this); 300 } 301 catch (Throwable throwable) { 302 throwable.printStackTrace(); 303 } 304 } 305 emitRecords(); 306 } 307 } 308 309 /** 310 * Emits the records. 311 */ 312 private synchronized void emitRecords() throws IOException { 313 for (String recordName : bufferedData.keySet()) { 314 RecordMap recordMap = bufferedData.get(recordName); 315 synchronized (recordMap) { 316 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet (); 317 for (Entry<TagMap, MetricMap> entry : entrySet) { 318 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 319 emitRecord(contextName, recordName, outRec); 320 } 321 } 322 } 323 flush(); 324 } 325 326 /** 327 * Retrieves all the records managed by this MetricsContext. 328 * Useful for monitoring systems that are polling-based. 329 * @return A non-null collection of all monitoring records. 330 */ 331 public synchronized Map<String, Collection<OutputRecord>> getAllRecords() { 332 Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>(); 333 for (String recordName : bufferedData.keySet()) { 334 RecordMap recordMap = bufferedData.get(recordName); 335 synchronized (recordMap) { 336 List<OutputRecord> records = new ArrayList<OutputRecord>(); 337 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet(); 338 for (Entry<TagMap, MetricMap> entry : entrySet) { 339 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 340 records.add(outRec); 341 } 342 out.put(recordName, records); 343 } 344 } 345 return out; 346 } 347 348 /** 349 * Sends a record to the metrics system. 350 */ 351 protected abstract void emitRecord(String contextName, String recordName, 352 OutputRecord outRec) throws IOException; 353 354 /** 355 * Called each period after all records have been emitted, this method does nothing. 356 * Subclasses may override it in order to perform some kind of flush. 357 */ 358 protected void flush() throws IOException { 359 } 360 361 /** 362 * Called by MetricsRecordImpl.update(). Creates or updates a row in 363 * the internal table of metric data. 364 */ 365 protected void update(MetricsRecordImpl record) { 366 String recordName = record.getRecordName(); 367 TagMap tagTable = record.getTagTable(); 368 Map<String,MetricValue> metricUpdates = record.getMetricTable(); 369 370 RecordMap recordMap = getRecordMap(recordName); 371 synchronized (recordMap) { 372 MetricMap metricMap = recordMap.get(tagTable); 373 if (metricMap == null) { 374 metricMap = new MetricMap(); 375 TagMap tagMap = new TagMap(tagTable); // clone tags 376 recordMap.put(tagMap, metricMap); 377 } 378 379 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet(); 380 for (Entry<String, MetricValue> entry : entrySet) { 381 String metricName = entry.getKey (); 382 MetricValue updateValue = entry.getValue (); 383 Number updateNumber = updateValue.getNumber(); 384 Number currentNumber = metricMap.get(metricName); 385 if (currentNumber == null || updateValue.isAbsolute()) { 386 metricMap.put(metricName, updateNumber); 387 } 388 else { 389 Number newNumber = sum(updateNumber, currentNumber); 390 metricMap.put(metricName, newNumber); 391 } 392 } 393 } 394 } 395 396 private synchronized RecordMap getRecordMap(String recordName) { 397 return bufferedData.get(recordName); 398 } 399 400 /** 401 * Adds two numbers, coercing the second to the type of the first. 402 * 403 */ 404 private Number sum(Number a, Number b) { 405 if (a instanceof Integer) { 406 return Integer.valueOf(a.intValue() + b.intValue()); 407 } 408 else if (a instanceof Float) { 409 return new Float(a.floatValue() + b.floatValue()); 410 } 411 else if (a instanceof Short) { 412 return Short.valueOf((short)(a.shortValue() + b.shortValue())); 413 } 414 else if (a instanceof Byte) { 415 return Byte.valueOf((byte)(a.byteValue() + b.byteValue())); 416 } 417 else if (a instanceof Long) { 418 return Long.valueOf((a.longValue() + b.longValue())); 419 } 420 else { 421 // should never happen 422 throw new MetricsException("Invalid number type"); 423 } 424 425 } 426 427 /** 428 * Called by MetricsRecordImpl.remove(). Removes all matching rows in 429 * the internal table of metric data. A row matches if it has the same 430 * tag names and values as record, but it may also have additional 431 * tags. 432 */ 433 protected void remove(MetricsRecordImpl record) { 434 String recordName = record.getRecordName(); 435 TagMap tagTable = record.getTagTable(); 436 437 RecordMap recordMap = getRecordMap(recordName); 438 synchronized (recordMap) { 439 Iterator<TagMap> it = recordMap.keySet().iterator(); 440 while (it.hasNext()) { 441 TagMap rowTags = it.next(); 442 if (rowTags.containsAll(tagTable)) { 443 it.remove(); 444 } 445 } 446 } 447 } 448 449 /** 450 * Returns the timer period. 451 */ 452 public int getPeriod() { 453 return period; 454 } 455 456 /** 457 * Sets the timer period 458 */ 459 protected void setPeriod(int period) { 460 this.period = period; 461 } 462 463 /** 464 * If a period is set in the attribute passed in, override 465 * the default with it. 466 */ 467 protected void parseAndSetPeriod(String attributeName) { 468 String periodStr = getAttribute(attributeName); 469 if (periodStr != null) { 470 int period = 0; 471 try { 472 period = Integer.parseInt(periodStr); 473 } catch (NumberFormatException nfe) { 474 } 475 if (period <= 0) { 476 throw new MetricsException("Invalid period: " + periodStr); 477 } 478 setPeriod(period); 479 } 480 } 481}