001 /*
002 * AbstractMetricsContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021 package org.apache.hadoop.metrics.spi;
022
023 import java.io.IOException;
024 import java.util.ArrayList;
025 import java.util.Collection;
026 import java.util.HashMap;
027 import java.util.HashSet;
028 import java.util.Iterator;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.Timer;
033 import java.util.TimerTask;
034 import java.util.TreeMap;
035 import java.util.Map.Entry;
036
037 import org.apache.hadoop.classification.InterfaceAudience;
038 import org.apache.hadoop.classification.InterfaceStability;
039 import org.apache.hadoop.metrics.ContextFactory;
040 import org.apache.hadoop.metrics.MetricsContext;
041 import org.apache.hadoop.metrics.MetricsException;
042 import org.apache.hadoop.metrics.MetricsRecord;
043 import org.apache.hadoop.metrics.Updater;
044
045 /**
046 * The main class of the Service Provider Interface. This class should be
047 * extended in order to integrate the Metrics API with a specific metrics
048 * client library. <p/>
049 *
050 * This class implements the internal table of metric data, and the timer
051 * on which data is to be sent to the metrics system. Subclasses must
052 * override the abstract <code>emitRecord</code> method in order to transmit
053 * the data. <p/>
054 */
055 @InterfaceAudience.Public
056 @InterfaceStability.Evolving
057 public abstract class AbstractMetricsContext implements MetricsContext {
058
059 private int period = MetricsContext.DEFAULT_PERIOD;
060 private Timer timer = null;
061
062 private Set<Updater> updaters = new HashSet<Updater>(1);
063 private volatile boolean isMonitoring = false;
064
065 private ContextFactory factory = null;
066 private String contextName = null;
067
068 @InterfaceAudience.Private
069 public static class TagMap extends TreeMap<String,Object> {
070 private static final long serialVersionUID = 3546309335061952993L;
071 TagMap() {
072 super();
073 }
074 TagMap(TagMap orig) {
075 super(orig);
076 }
077 /**
078 * Returns true if this tagmap contains every tag in other.
079 */
080 public boolean containsAll(TagMap other) {
081 for (Map.Entry<String,Object> entry : other.entrySet()) {
082 Object value = get(entry.getKey());
083 if (value == null || !value.equals(entry.getValue())) {
084 // either key does not exist here, or the value is different
085 return false;
086 }
087 }
088 return true;
089 }
090 }
091
092 @InterfaceAudience.Private
093 public static class MetricMap extends TreeMap<String,Number> {
094 private static final long serialVersionUID = -7495051861141631609L;
095 MetricMap() {
096 super();
097 }
098 MetricMap(MetricMap orig) {
099 super(orig);
100 }
101 }
102
103 static class RecordMap extends HashMap<TagMap,MetricMap> {
104 private static final long serialVersionUID = 259835619700264611L;
105 }
106
107 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
108
109
110 /**
111 * Creates a new instance of AbstractMetricsContext
112 */
113 protected AbstractMetricsContext() {
114 }
115
116 /**
117 * Initializes the context.
118 */
119 public void init(String contextName, ContextFactory factory)
120 {
121 this.contextName = contextName;
122 this.factory = factory;
123 }
124
125 /**
126 * Convenience method for subclasses to access factory attributes.
127 */
128 protected String getAttribute(String attributeName) {
129 String factoryAttribute = contextName + "." + attributeName;
130 return (String) factory.getAttribute(factoryAttribute);
131 }
132
133 /**
134 * Returns an attribute-value map derived from the factory attributes
135 * by finding all factory attributes that begin with
136 * <i>contextName</i>.<i>tableName</i>. The returned map consists of
137 * those attributes with the contextName and tableName stripped off.
138 */
139 protected Map<String,String> getAttributeTable(String tableName) {
140 String prefix = contextName + "." + tableName + ".";
141 Map<String,String> result = new HashMap<String,String>();
142 for (String attributeName : factory.getAttributeNames()) {
143 if (attributeName.startsWith(prefix)) {
144 String name = attributeName.substring(prefix.length());
145 String value = (String) factory.getAttribute(attributeName);
146 result.put(name, value);
147 }
148 }
149 return result;
150 }
151
152 /**
153 * Returns the context name.
154 */
155 public String getContextName() {
156 return contextName;
157 }
158
159 /**
160 * Returns the factory by which this context was created.
161 */
162 public ContextFactory getContextFactory() {
163 return factory;
164 }
165
166 /**
167 * Starts or restarts monitoring, the emitting of metrics records.
168 */
169 public synchronized void startMonitoring()
170 throws IOException {
171 if (!isMonitoring) {
172 startTimer();
173 isMonitoring = true;
174 }
175 }
176
177 /**
178 * Stops monitoring. This does not free buffered data.
179 * @see #close()
180 */
181 public synchronized void stopMonitoring() {
182 if (isMonitoring) {
183 stopTimer();
184 isMonitoring = false;
185 }
186 }
187
188 /**
189 * Returns true if monitoring is currently in progress.
190 */
191 public boolean isMonitoring() {
192 return isMonitoring;
193 }
194
195 /**
196 * Stops monitoring and frees buffered data, returning this
197 * object to its initial state.
198 */
199 public synchronized void close() {
200 stopMonitoring();
201 clearUpdaters();
202 }
203
204 /**
205 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
206 * Throws an exception if the metrics implementation is configured with a fixed
207 * set of record names and <code>recordName</code> is not in that set.
208 *
209 * @param recordName the name of the record
210 * @throws MetricsException if recordName conflicts with configuration data
211 */
212 public final synchronized MetricsRecord createRecord(String recordName) {
213 if (bufferedData.get(recordName) == null) {
214 bufferedData.put(recordName, new RecordMap());
215 }
216 return newRecord(recordName);
217 }
218
219 /**
220 * Subclasses should override this if they subclass MetricsRecordImpl.
221 * @param recordName the name of the record
222 * @return newly created instance of MetricsRecordImpl or subclass
223 */
224 protected MetricsRecord newRecord(String recordName) {
225 return new MetricsRecordImpl(recordName, this);
226 }
227
228 /**
229 * Registers a callback to be called at time intervals determined by
230 * the configuration.
231 *
232 * @param updater object to be run periodically; it should update
233 * some metrics records
234 */
235 public synchronized void registerUpdater(final Updater updater) {
236 if (!updaters.contains(updater)) {
237 updaters.add(updater);
238 }
239 }
240
241 /**
242 * Removes a callback, if it exists.
243 *
244 * @param updater object to be removed from the callback list
245 */
246 public synchronized void unregisterUpdater(Updater updater) {
247 updaters.remove(updater);
248 }
249
250 private synchronized void clearUpdaters() {
251 updaters.clear();
252 }
253
254 /**
255 * Starts timer if it is not already started
256 */
257 private synchronized void startTimer() {
258 if (timer == null) {
259 timer = new Timer("Timer thread for monitoring " + getContextName(),
260 true);
261 TimerTask task = new TimerTask() {
262 public void run() {
263 try {
264 timerEvent();
265 }
266 catch (IOException ioe) {
267 ioe.printStackTrace();
268 }
269 }
270 };
271 long millis = period * 1000;
272 timer.scheduleAtFixedRate(task, millis, millis);
273 }
274 }
275
276 /**
277 * Stops timer if it is running
278 */
279 private synchronized void stopTimer() {
280 if (timer != null) {
281 timer.cancel();
282 timer = null;
283 }
284 }
285
286 /**
287 * Timer callback.
288 */
289 private void timerEvent() throws IOException {
290 if (isMonitoring) {
291 Collection<Updater> myUpdaters;
292 synchronized (this) {
293 myUpdaters = new ArrayList<Updater>(updaters);
294 }
295 // Run all the registered updates without holding a lock
296 // on this context
297 for (Updater updater : myUpdaters) {
298 try {
299 updater.doUpdates(this);
300 }
301 catch (Throwable throwable) {
302 throwable.printStackTrace();
303 }
304 }
305 emitRecords();
306 }
307 }
308
309 /**
310 * Emits the records.
311 */
312 private synchronized void emitRecords() throws IOException {
313 for (String recordName : bufferedData.keySet()) {
314 RecordMap recordMap = bufferedData.get(recordName);
315 synchronized (recordMap) {
316 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
317 for (Entry<TagMap, MetricMap> entry : entrySet) {
318 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
319 emitRecord(contextName, recordName, outRec);
320 }
321 }
322 }
323 flush();
324 }
325
326 /**
327 * Retrieves all the records managed by this MetricsContext.
328 * Useful for monitoring systems that are polling-based.
329 * @return A non-null collection of all monitoring records.
330 */
331 public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
332 Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
333 for (String recordName : bufferedData.keySet()) {
334 RecordMap recordMap = bufferedData.get(recordName);
335 synchronized (recordMap) {
336 List<OutputRecord> records = new ArrayList<OutputRecord>();
337 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
338 for (Entry<TagMap, MetricMap> entry : entrySet) {
339 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
340 records.add(outRec);
341 }
342 out.put(recordName, records);
343 }
344 }
345 return out;
346 }
347
348 /**
349 * Sends a record to the metrics system.
350 */
351 protected abstract void emitRecord(String contextName, String recordName,
352 OutputRecord outRec) throws IOException;
353
354 /**
355 * Called each period after all records have been emitted, this method does nothing.
356 * Subclasses may override it in order to perform some kind of flush.
357 */
358 protected void flush() throws IOException {
359 }
360
361 /**
362 * Called by MetricsRecordImpl.update(). Creates or updates a row in
363 * the internal table of metric data.
364 */
365 protected void update(MetricsRecordImpl record) {
366 String recordName = record.getRecordName();
367 TagMap tagTable = record.getTagTable();
368 Map<String,MetricValue> metricUpdates = record.getMetricTable();
369
370 RecordMap recordMap = getRecordMap(recordName);
371 synchronized (recordMap) {
372 MetricMap metricMap = recordMap.get(tagTable);
373 if (metricMap == null) {
374 metricMap = new MetricMap();
375 TagMap tagMap = new TagMap(tagTable); // clone tags
376 recordMap.put(tagMap, metricMap);
377 }
378
379 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
380 for (Entry<String, MetricValue> entry : entrySet) {
381 String metricName = entry.getKey ();
382 MetricValue updateValue = entry.getValue ();
383 Number updateNumber = updateValue.getNumber();
384 Number currentNumber = metricMap.get(metricName);
385 if (currentNumber == null || updateValue.isAbsolute()) {
386 metricMap.put(metricName, updateNumber);
387 }
388 else {
389 Number newNumber = sum(updateNumber, currentNumber);
390 metricMap.put(metricName, newNumber);
391 }
392 }
393 }
394 }
395
396 private synchronized RecordMap getRecordMap(String recordName) {
397 return bufferedData.get(recordName);
398 }
399
400 /**
401 * Adds two numbers, coercing the second to the type of the first.
402 *
403 */
404 private Number sum(Number a, Number b) {
405 if (a instanceof Integer) {
406 return Integer.valueOf(a.intValue() + b.intValue());
407 }
408 else if (a instanceof Float) {
409 return new Float(a.floatValue() + b.floatValue());
410 }
411 else if (a instanceof Short) {
412 return Short.valueOf((short)(a.shortValue() + b.shortValue()));
413 }
414 else if (a instanceof Byte) {
415 return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
416 }
417 else if (a instanceof Long) {
418 return Long.valueOf((a.longValue() + b.longValue()));
419 }
420 else {
421 // should never happen
422 throw new MetricsException("Invalid number type");
423 }
424
425 }
426
427 /**
428 * Called by MetricsRecordImpl.remove(). Removes all matching rows in
429 * the internal table of metric data. A row matches if it has the same
430 * tag names and values as record, but it may also have additional
431 * tags.
432 */
433 protected void remove(MetricsRecordImpl record) {
434 String recordName = record.getRecordName();
435 TagMap tagTable = record.getTagTable();
436
437 RecordMap recordMap = getRecordMap(recordName);
438 synchronized (recordMap) {
439 Iterator<TagMap> it = recordMap.keySet().iterator();
440 while (it.hasNext()) {
441 TagMap rowTags = it.next();
442 if (rowTags.containsAll(tagTable)) {
443 it.remove();
444 }
445 }
446 }
447 }
448
449 /**
450 * Returns the timer period.
451 */
452 public int getPeriod() {
453 return period;
454 }
455
456 /**
457 * Sets the timer period
458 */
459 protected void setPeriod(int period) {
460 this.period = period;
461 }
462
463 /**
464 * If a period is set in the attribute passed in, override
465 * the default with it.
466 */
467 protected void parseAndSetPeriod(String attributeName) {
468 String periodStr = getAttribute(attributeName);
469 if (periodStr != null) {
470 int period = 0;
471 try {
472 period = Integer.parseInt(periodStr);
473 } catch (NumberFormatException nfe) {
474 }
475 if (period <= 0) {
476 throw new MetricsException("Invalid period: " + periodStr);
477 }
478 setPeriod(period);
479 }
480 }
481 }