001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.lib; 020 021import com.google.common.collect.Sets; 022import java.lang.ref.WeakReference; 023import java.lang.reflect.Method; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentLinkedDeque; 030import java.util.concurrent.ConcurrentMap; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.metrics2.MetricsRecordBuilder; 036import org.apache.hadoop.metrics2.util.SampleStat; 037 038 039/** 040 * Helper class to manage a group of mutable rate metrics. 041 * 042 * Each thread will maintain a local rate count, and upon snapshot, 043 * these values will be aggregated into a global rate. This class 044 * should only be used for long running threads, as any metrics 045 * produced between the last snapshot and the death of a thread 046 * will be lost. This allows for significantly higher concurrency 047 * than {@link MutableRates}. See HADOOP-24420. 048 */ 049@InterfaceAudience.Public 050@InterfaceStability.Evolving 051public class MutableRatesWithAggregation extends MutableMetric { 052 static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class); 053 private final Map<String, MutableRate> globalMetrics = new HashMap<>(); 054 private final Set<Class<?>> protocolCache = Sets.newHashSet(); 055 056 private final ConcurrentLinkedDeque<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>> 057 weakReferenceQueue = new ConcurrentLinkedDeque<>(); 058 private final ThreadLocal<ConcurrentMap<String, ThreadSafeSampleStat>> 059 threadLocalMetricsMap = new ThreadLocal<>(); 060 061 /** 062 * Initialize the registry with all the methods in a protocol 063 * so they all show up in the first snapshot. 064 * Convenient for JMX implementations. 065 * @param protocol the protocol class 066 */ 067 public void init(Class<?> protocol) { 068 if (protocolCache.contains(protocol)) { 069 return; 070 } 071 protocolCache.add(protocol); 072 for (Method method : protocol.getDeclaredMethods()) { 073 String name = method.getName(); 074 LOG.debug(name); 075 addMetricIfNotExists(name); 076 } 077 } 078 079 /** 080 * Add a rate sample for a rate metric. 081 * @param name of the rate metric 082 * @param elapsed time 083 */ 084 public void add(String name, long elapsed) { 085 ConcurrentMap<String, ThreadSafeSampleStat> localStats = 086 threadLocalMetricsMap.get(); 087 if (localStats == null) { 088 localStats = new ConcurrentHashMap<>(); 089 threadLocalMetricsMap.set(localStats); 090 weakReferenceQueue.add(new WeakReference<>(localStats)); 091 } 092 ThreadSafeSampleStat stat = localStats.get(name); 093 if (stat == null) { 094 stat = new ThreadSafeSampleStat(); 095 localStats.put(name, stat); 096 } 097 stat.add(elapsed); 098 } 099 100 @Override 101 public synchronized void snapshot(MetricsRecordBuilder rb, boolean all) { 102 Iterator<WeakReference<ConcurrentMap<String, ThreadSafeSampleStat>>> iter = 103 weakReferenceQueue.iterator(); 104 while (iter.hasNext()) { 105 ConcurrentMap<String, ThreadSafeSampleStat> map = iter.next().get(); 106 if (map == null) { 107 // Thread has died; clean up its state 108 iter.remove(); 109 } else { 110 // Aggregate the thread's local samples into the global metrics 111 for (Map.Entry<String, ThreadSafeSampleStat> entry : map.entrySet()) { 112 String name = entry.getKey(); 113 MutableRate globalMetric = addMetricIfNotExists(name); 114 entry.getValue().snapshotInto(globalMetric); 115 } 116 } 117 } 118 for (MutableRate globalMetric : globalMetrics.values()) { 119 globalMetric.snapshot(rb, all); 120 } 121 } 122 123 private synchronized MutableRate addMetricIfNotExists(String name) { 124 MutableRate metric = globalMetrics.get(name); 125 if (metric == null) { 126 metric = new MutableRate(name, name, false); 127 globalMetrics.put(name, metric); 128 } 129 return metric; 130 } 131 132 private static class ThreadSafeSampleStat { 133 134 private SampleStat stat = new SampleStat(); 135 136 synchronized void add(double x) { 137 stat.add(x); 138 } 139 140 synchronized void snapshotInto(MutableRate metric) { 141 if (stat.numSamples() > 0) { 142 metric.add(stat.numSamples(), Math.round(stat.total())); 143 stat.reset(); 144 } 145 } 146 } 147 148}