001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.metrics2.util;
020    
021    import java.util.Collection;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    import java.util.Set;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.metrics2.AbstractMetric;
031    import org.apache.hadoop.metrics2.MetricsRecord;
032    import org.apache.hadoop.metrics2.MetricsTag;
033    
034    import com.google.common.base.Objects;
035    import com.google.common.collect.Maps;
036    
037    /**
038     * A metrics cache for sinks that don't support sparse updates.
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Evolving
042    public class MetricsCache {
043      static final Log LOG = LogFactory.getLog(MetricsCache.class);
044      static final int MAX_RECS_PER_NAME_DEFAULT = 1000;
045    
046      private final Map<String, RecordCache> map = Maps.newHashMap();
047      private final int maxRecsPerName;
048    
049      class RecordCache
050          extends LinkedHashMap<Collection<MetricsTag>, Record> {
051        private static final long serialVersionUID = 1L;
052        private boolean gotOverflow = false;
053    
054        @Override
055        protected boolean removeEldestEntry(Map.Entry<Collection<MetricsTag>,
056                                                      Record> eldest) {
057          boolean overflow = size() > maxRecsPerName;
058          if (overflow && !gotOverflow) {
059            LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
060            gotOverflow = true;
061          }
062          return overflow;
063        }
064      }
065    
066      /**
067       * Cached record
068       */
069      public static class Record {
070        final Map<String, String> tags = Maps.newHashMap();
071        final Map<String, AbstractMetric> metrics = Maps.newHashMap();
072    
073        /**
074         * Lookup a tag value
075         * @param key name of the tag
076         * @return the tag value
077         */
078        public String getTag(String key) {
079          return tags.get(key);
080        }
081    
082        /**
083         * Lookup a metric value
084         * @param key name of the metric
085         * @return the metric value
086         */
087        public Number getMetric(String key) {
088          AbstractMetric metric = metrics.get(key);
089          return metric != null ? metric.value() : null;
090        }
091    
092        /**
093         * Lookup a metric instance
094         * @param key name of the metric
095         * @return the metric instance
096         */
097        public AbstractMetric getMetricInstance(String key) {
098          return metrics.get(key);
099        }
100    
101        /**
102         * @return the entry set of the tags of the record
103         */
104        public Set<Map.Entry<String, String>> tags() {
105          return tags.entrySet();
106        }
107    
108        /**
109         * @deprecated use metricsEntrySet() instead
110         * @return entry set of metrics
111         */
112        @Deprecated
113        public Set<Map.Entry<String, Number>> metrics() {
114          Map<String, Number> map = new LinkedHashMap<String, Number>(
115              metrics.size());
116          for (Map.Entry<String, AbstractMetric> mapEntry : metrics.entrySet()) {
117            map.put(mapEntry.getKey(), mapEntry.getValue().value());
118          }
119          return map.entrySet();
120        }
121    
122        /**
123         * @return entry set of metrics
124         */
125        public Set<Map.Entry<String, AbstractMetric>> metricsEntrySet() {
126          return metrics.entrySet();
127        }
128    
129        @Override public String toString() {
130          return Objects.toStringHelper(this)
131              .add("tags", tags).add("metrics", metrics)
132              .toString();
133        }
134      }
135    
136      public MetricsCache() {
137        this(MAX_RECS_PER_NAME_DEFAULT);
138      }
139    
140      /**
141       * Construct a metrics cache
142       * @param maxRecsPerName  limit of the number records per record name
143       */
144      public MetricsCache(int maxRecsPerName) {
145        this.maxRecsPerName = maxRecsPerName;
146      }
147    
148      /**
149       * Update the cache and return the current cached record
150       * @param mr the update record
151       * @param includingTags cache tag values (for later lookup by name) if true
152       * @return the updated cache record
153       */
154      public Record update(MetricsRecord mr, boolean includingTags) {
155        String name = mr.name();
156        RecordCache recordCache = map.get(name);
157        if (recordCache == null) {
158          recordCache = new RecordCache();
159          map.put(name, recordCache);
160        }
161        Collection<MetricsTag> tags = mr.tags();
162        Record record = recordCache.get(tags);
163        if (record == null) {
164          record = new Record();
165          recordCache.put(tags, record);
166        }
167        for (AbstractMetric m : mr.metrics()) {
168          record.metrics.put(m.name(), m);
169        }
170        if (includingTags) {
171          // mostly for some sinks that include tags as part of a dense schema
172          for (MetricsTag t : mr.tags()) {
173            record.tags.put(t.name(), t.value());
174          }
175        }
176        return record;
177      }
178    
179      /**
180       * Update the cache and return the current cache record
181       * @param mr the update record
182       * @return the updated cache record
183       */
184      public Record update(MetricsRecord mr) {
185        return update(mr, false);
186      }
187    
188      /**
189       * Get the cached record
190       * @param name of the record
191       * @param tags of the record
192       * @return the cached record or null
193       */
194      public Record get(String name, Collection<MetricsTag> tags) {
195        RecordCache rc = map.get(name);
196        if (rc == null) return null;
197        return rc.get(tags);
198      }
199    }