001/* 002 * AbstractMetricsContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.spi; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.Timer; 033import java.util.TimerTask; 034import java.util.TreeMap; 035import java.util.Map.Entry; 036 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.metrics.ContextFactory; 040import org.apache.hadoop.metrics.MetricsContext; 041import org.apache.hadoop.metrics.MetricsException; 042import org.apache.hadoop.metrics.MetricsRecord; 043import org.apache.hadoop.metrics.Updater; 044 045/** 046 * The main class of the Service Provider Interface. This class should be 047 * extended in order to integrate the Metrics API with a specific metrics 048 * client library. <p/> 049 * 050 * This class implements the internal table of metric data, and the timer 051 * on which data is to be sent to the metrics system. Subclasses must 052 * override the abstract <code>emitRecord</code> method in order to transmit 053 * the data. <p/> 054 * 055 * @deprecated Use org.apache.hadoop.metrics2 package instead. 056 */ 057@Deprecated 058@InterfaceAudience.Public 059@InterfaceStability.Evolving 060public abstract class AbstractMetricsContext implements MetricsContext { 061 062 private int period = MetricsContext.DEFAULT_PERIOD; 063 private Timer timer = null; 064 065 private Set<Updater> updaters = new HashSet<Updater>(1); 066 private volatile boolean isMonitoring = false; 067 068 private ContextFactory factory = null; 069 private String contextName = null; 070 071 @InterfaceAudience.Private 072 public static class TagMap extends TreeMap<String,Object> { 073 private static final long serialVersionUID = 3546309335061952993L; 074 TagMap() { 075 super(); 076 } 077 TagMap(TagMap orig) { 078 super(orig); 079 } 080 /** 081 * Returns true if this tagmap contains every tag in other. 082 */ 083 public boolean containsAll(TagMap other) { 084 for (Map.Entry<String,Object> entry : other.entrySet()) { 085 Object value = get(entry.getKey()); 086 if (value == null || !value.equals(entry.getValue())) { 087 // either key does not exist here, or the value is different 088 return false; 089 } 090 } 091 return true; 092 } 093 } 094 095 @InterfaceAudience.Private 096 public static class MetricMap extends TreeMap<String,Number> { 097 private static final long serialVersionUID = -7495051861141631609L; 098 MetricMap() { 099 super(); 100 } 101 MetricMap(MetricMap orig) { 102 super(orig); 103 } 104 } 105 106 static class RecordMap extends HashMap<TagMap,MetricMap> { 107 private static final long serialVersionUID = 259835619700264611L; 108 } 109 110 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>(); 111 112 113 /** 114 * Creates a new instance of AbstractMetricsContext 115 */ 116 protected AbstractMetricsContext() { 117 } 118 119 /** 120 * Initializes the context. 121 */ 122 public void init(String contextName, ContextFactory factory) 123 { 124 this.contextName = contextName; 125 this.factory = factory; 126 } 127 128 /** 129 * Convenience method for subclasses to access factory attributes. 130 */ 131 protected String getAttribute(String attributeName) { 132 String factoryAttribute = contextName + "." + attributeName; 133 return (String) factory.getAttribute(factoryAttribute); 134 } 135 136 /** 137 * Returns an attribute-value map derived from the factory attributes 138 * by finding all factory attributes that begin with 139 * <i>contextName</i>.<i>tableName</i>. The returned map consists of 140 * those attributes with the contextName and tableName stripped off. 141 */ 142 protected Map<String,String> getAttributeTable(String tableName) { 143 String prefix = contextName + "." + tableName + "."; 144 Map<String,String> result = new HashMap<String,String>(); 145 for (String attributeName : factory.getAttributeNames()) { 146 if (attributeName.startsWith(prefix)) { 147 String name = attributeName.substring(prefix.length()); 148 String value = (String) factory.getAttribute(attributeName); 149 result.put(name, value); 150 } 151 } 152 return result; 153 } 154 155 /** 156 * Returns the context name. 157 */ 158 public String getContextName() { 159 return contextName; 160 } 161 162 /** 163 * Returns the factory by which this context was created. 164 */ 165 public ContextFactory getContextFactory() { 166 return factory; 167 } 168 169 /** 170 * Starts or restarts monitoring, the emitting of metrics records. 171 */ 172 public synchronized void startMonitoring() 173 throws IOException { 174 if (!isMonitoring) { 175 startTimer(); 176 isMonitoring = true; 177 } 178 } 179 180 /** 181 * Stops monitoring. This does not free buffered data. 182 * @see #close() 183 */ 184 public synchronized void stopMonitoring() { 185 if (isMonitoring) { 186 stopTimer(); 187 isMonitoring = false; 188 } 189 } 190 191 /** 192 * Returns true if monitoring is currently in progress. 193 */ 194 public boolean isMonitoring() { 195 return isMonitoring; 196 } 197 198 /** 199 * Stops monitoring and frees buffered data, returning this 200 * object to its initial state. 201 */ 202 public synchronized void close() { 203 stopMonitoring(); 204 clearUpdaters(); 205 } 206 207 /** 208 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>. 209 * Throws an exception if the metrics implementation is configured with a fixed 210 * set of record names and <code>recordName</code> is not in that set. 211 * 212 * @param recordName the name of the record 213 * @throws MetricsException if recordName conflicts with configuration data 214 */ 215 public final synchronized MetricsRecord createRecord(String recordName) { 216 if (bufferedData.get(recordName) == null) { 217 bufferedData.put(recordName, new RecordMap()); 218 } 219 return newRecord(recordName); 220 } 221 222 /** 223 * Subclasses should override this if they subclass MetricsRecordImpl. 224 * @param recordName the name of the record 225 * @return newly created instance of MetricsRecordImpl or subclass 226 */ 227 protected MetricsRecord newRecord(String recordName) { 228 return new MetricsRecordImpl(recordName, this); 229 } 230 231 /** 232 * Registers a callback to be called at time intervals determined by 233 * the configuration. 234 * 235 * @param updater object to be run periodically; it should update 236 * some metrics records 237 */ 238 public synchronized void registerUpdater(final Updater updater) { 239 if (!updaters.contains(updater)) { 240 updaters.add(updater); 241 } 242 } 243 244 /** 245 * Removes a callback, if it exists. 246 * 247 * @param updater object to be removed from the callback list 248 */ 249 public synchronized void unregisterUpdater(Updater updater) { 250 updaters.remove(updater); 251 } 252 253 private synchronized void clearUpdaters() { 254 updaters.clear(); 255 } 256 257 /** 258 * Starts timer if it is not already started 259 */ 260 private synchronized void startTimer() { 261 if (timer == null) { 262 timer = new Timer("Timer thread for monitoring " + getContextName(), 263 true); 264 TimerTask task = new TimerTask() { 265 public void run() { 266 try { 267 timerEvent(); 268 } 269 catch (IOException ioe) { 270 ioe.printStackTrace(); 271 } 272 } 273 }; 274 long millis = period * 1000; 275 timer.scheduleAtFixedRate(task, millis, millis); 276 } 277 } 278 279 /** 280 * Stops timer if it is running 281 */ 282 private synchronized void stopTimer() { 283 if (timer != null) { 284 timer.cancel(); 285 timer = null; 286 } 287 } 288 289 /** 290 * Timer callback. 291 */ 292 private void timerEvent() throws IOException { 293 if (isMonitoring) { 294 Collection<Updater> myUpdaters; 295 synchronized (this) { 296 myUpdaters = new ArrayList<Updater>(updaters); 297 } 298 // Run all the registered updates without holding a lock 299 // on this context 300 for (Updater updater : myUpdaters) { 301 try { 302 updater.doUpdates(this); 303 } 304 catch (Throwable throwable) { 305 throwable.printStackTrace(); 306 } 307 } 308 emitRecords(); 309 } 310 } 311 312 /** 313 * Emits the records. 314 */ 315 private synchronized void emitRecords() throws IOException { 316 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 317 RecordMap recordMap = recordEntry.getValue(); 318 synchronized (recordMap) { 319 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet (); 320 for (Entry<TagMap, MetricMap> entry : entrySet) { 321 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 322 emitRecord(contextName, recordEntry.getKey(), outRec); 323 } 324 } 325 } 326 flush(); 327 } 328 329 /** 330 * Retrieves all the records managed by this MetricsContext. 331 * Useful for monitoring systems that are polling-based. 332 * @return A non-null collection of all monitoring records. 333 */ 334 public synchronized Map<String, Collection<OutputRecord>> getAllRecords() { 335 Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>(); 336 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 337 RecordMap recordMap = recordEntry.getValue(); 338 synchronized (recordMap) { 339 List<OutputRecord> records = new ArrayList<OutputRecord>(); 340 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet(); 341 for (Entry<TagMap, MetricMap> entry : entrySet) { 342 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 343 records.add(outRec); 344 } 345 out.put(recordEntry.getKey(), records); 346 } 347 } 348 return out; 349 } 350 351 /** 352 * Sends a record to the metrics system. 353 */ 354 protected abstract void emitRecord(String contextName, String recordName, 355 OutputRecord outRec) throws IOException; 356 357 /** 358 * Called each period after all records have been emitted, this method does nothing. 359 * Subclasses may override it in order to perform some kind of flush. 360 */ 361 protected void flush() throws IOException { 362 } 363 364 /** 365 * Called by MetricsRecordImpl.update(). Creates or updates a row in 366 * the internal table of metric data. 367 */ 368 protected void update(MetricsRecordImpl record) { 369 String recordName = record.getRecordName(); 370 TagMap tagTable = record.getTagTable(); 371 Map<String,MetricValue> metricUpdates = record.getMetricTable(); 372 373 RecordMap recordMap = getRecordMap(recordName); 374 synchronized (recordMap) { 375 MetricMap metricMap = recordMap.get(tagTable); 376 if (metricMap == null) { 377 metricMap = new MetricMap(); 378 TagMap tagMap = new TagMap(tagTable); // clone tags 379 recordMap.put(tagMap, metricMap); 380 } 381 382 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet(); 383 for (Entry<String, MetricValue> entry : entrySet) { 384 String metricName = entry.getKey (); 385 MetricValue updateValue = entry.getValue (); 386 Number updateNumber = updateValue.getNumber(); 387 Number currentNumber = metricMap.get(metricName); 388 if (currentNumber == null || updateValue.isAbsolute()) { 389 metricMap.put(metricName, updateNumber); 390 } 391 else { 392 Number newNumber = sum(updateNumber, currentNumber); 393 metricMap.put(metricName, newNumber); 394 } 395 } 396 } 397 } 398 399 private synchronized RecordMap getRecordMap(String recordName) { 400 return bufferedData.get(recordName); 401 } 402 403 /** 404 * Adds two numbers, coercing the second to the type of the first. 405 * 406 */ 407 private Number sum(Number a, Number b) { 408 if (a instanceof Integer) { 409 return Integer.valueOf(a.intValue() + b.intValue()); 410 } 411 else if (a instanceof Float) { 412 return new Float(a.floatValue() + b.floatValue()); 413 } 414 else if (a instanceof Short) { 415 return Short.valueOf((short)(a.shortValue() + b.shortValue())); 416 } 417 else if (a instanceof Byte) { 418 return Byte.valueOf((byte)(a.byteValue() + b.byteValue())); 419 } 420 else if (a instanceof Long) { 421 return Long.valueOf((a.longValue() + b.longValue())); 422 } 423 else { 424 // should never happen 425 throw new MetricsException("Invalid number type"); 426 } 427 428 } 429 430 /** 431 * Called by MetricsRecordImpl.remove(). Removes all matching rows in 432 * the internal table of metric data. A row matches if it has the same 433 * tag names and values as record, but it may also have additional 434 * tags. 435 */ 436 protected void remove(MetricsRecordImpl record) { 437 String recordName = record.getRecordName(); 438 TagMap tagTable = record.getTagTable(); 439 440 RecordMap recordMap = getRecordMap(recordName); 441 synchronized (recordMap) { 442 Iterator<TagMap> it = recordMap.keySet().iterator(); 443 while (it.hasNext()) { 444 TagMap rowTags = it.next(); 445 if (rowTags.containsAll(tagTable)) { 446 it.remove(); 447 } 448 } 449 } 450 } 451 452 /** 453 * Returns the timer period. 454 */ 455 public int getPeriod() { 456 return period; 457 } 458 459 /** 460 * Sets the timer period 461 */ 462 protected void setPeriod(int period) { 463 this.period = period; 464 } 465 466 /** 467 * If a period is set in the attribute passed in, override 468 * the default with it. 469 */ 470 protected void parseAndSetPeriod(String attributeName) { 471 String periodStr = getAttribute(attributeName); 472 if (periodStr != null) { 473 int period = 0; 474 try { 475 period = Integer.parseInt(periodStr); 476 } catch (NumberFormatException nfe) { 477 } 478 if (period <= 0) { 479 throw new MetricsException("Invalid period: " + periodStr); 480 } 481 setPeriod(period); 482 } 483 } 484}