001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.lib;
020
021import com.google.common.collect.Sets;
022import java.lang.ref.WeakReference;
023import java.lang.reflect.Method;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentLinkedDeque;
030import java.util.concurrent.ConcurrentMap;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.metrics2.MetricsRecordBuilder;
036import org.apache.hadoop.metrics2.util.SampleStat;
037
038
039/**
040 * Helper class to manage a group of mutable rate metrics.
041 *
042 * Each thread will maintain a local rate count, and upon snapshot,
043 * these values will be aggregated into a global rate. This class
044 * should only be used for long running threads, as any metrics
045 * produced between the last snapshot and the death of a thread
046 * will be lost. This allows for significantly higher concurrency
047 * than {@link MutableRates}. See HADOOP-24420.
048 */
049@InterfaceAudience.Public
050@InterfaceStability.Evolving
051public class MutableRatesWithAggregation extends MutableMetric {
052  static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class);
053  private final Map<String, MutableRate> globalMetrics = new HashMap<>();
054  private final Set<Class<?>> protocolCache = Sets.newHashSet();
055
056  private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>>
057      weakReferenceQueue = new ConcurrentLinkedDeque<>();
058  private final ThreadLocal<ConcurrentMap<String, ThreadSafeSampleStat>>
059      threadLocalMetricsMap = new ThreadLocal<>();
060
061  /**
062   * Initialize the registry with all the methods in a protocol
063   * so they all show up in the first snapshot.
064   * Convenient for JMX implementations.
065   * @param protocol the protocol class
066   */
067  public void init(Class<?> protocol) {
068    if (protocolCache.contains(protocol)) {
069      return;
070    }
071    protocolCache.add(protocol);
072    for (Method method : protocol.getDeclaredMethods()) {
073      String name = method.getName();
074      LOG.debug(name);
075      addMetricIfNotExists(name);
076    }
077  }
078
079  /**
080   * Add a rate sample for a rate metric.
081   * @param name of the rate metric
082   * @param elapsed time
083   */
084  public void add(String name, long elapsed) {
085    ConcurrentMap<String, ThreadSafeSampleStat> localStats =
086        threadLocalMetricsMap.get();
087    if (localStats == null) {
088      localStats = new ConcurrentHashMap<>();
089      threadLocalMetricsMap.set(localStats);
090      weakReferenceQueue.add(new WeakReference<>(localStats));
091    }
092    ThreadSafeSampleStat stat = localStats.get(name);
093    if (stat == null) {
094      stat = new ThreadSafeSampleStat();
095      localStats.put(name, stat);
096    }
097    stat.add(elapsed);
098  }
099
100  @Override
101  public synchronized void snapshot(MetricsRecordBuilder rb, boolean all) {
102    Iterator<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>> iter =
103        weakReferenceQueue.iterator();
104    while (iter.hasNext()) {
105      ConcurrentMap<String, ThreadSafeSampleStat> map = iter.next().get();
106      if (map == null) {
107        // Thread has died; clean up its state
108        iter.remove();
109      } else {
110        // Aggregate the thread's local samples into the global metrics
111        for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) {
112          String name = entry.getKey();
113          MutableRate globalMetric = addMetricIfNotExists(name);
114          entry.getValue().snapshotInto(globalMetric);
115        }
116      }
117    }
118    for (MutableRate globalMetric : globalMetrics.values()) {
119      globalMetric.snapshot(rb, all);
120    }
121  }
122
123  private synchronized MutableRate addMetricIfNotExists(String name) {
124    MutableRate metric = globalMetrics.get(name);
125    if (metric == null) {
126      metric = new MutableRate(name, name, false);
127      globalMetrics.put(name, metric);
128    }
129    return metric;
130  }
131
132  private static class ThreadSafeSampleStat {
133
134    private SampleStat stat = new SampleStat();
135
136    synchronized void add(double x) {
137      stat.add(x);
138    }
139
140    synchronized void snapshotInto(MutableRate metric) {
141      if (stat.numSamples() > 0) {
142        metric.add(stat.numSamples(), Math.round(stat.total()));
143        stat.reset();
144      }
145    }
146  }
147
148}