001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.metrics2.lib;
020    
021    import static org.apache.hadoop.metrics2.lib.Interns.info;
022    
023    import java.util.Map;
024    import java.util.concurrent.Executors;
025    import java.util.concurrent.ScheduledExecutorService;
026    import java.util.concurrent.TimeUnit;
027    
028    import org.apache.commons.lang.StringUtils;
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.metrics2.MetricsInfo;
032    import org.apache.hadoop.metrics2.MetricsRecordBuilder;
033    import org.apache.hadoop.metrics2.util.Quantile;
034    import org.apache.hadoop.metrics2.util.SampleQuantiles;
035    
036    import com.google.common.annotations.VisibleForTesting;
037    import com.google.common.util.concurrent.ThreadFactoryBuilder;
038    
039    /**
040     * Watches a stream of long values, maintaining online estimates of specific
041     * quantiles with provably low error bounds. This is particularly useful for
042     * accurate high-percentile (e.g. 95th, 99th) latency metrics.
043     */
044    @InterfaceAudience.Public
045    @InterfaceStability.Evolving
046    public class MutableQuantiles extends MutableMetric {
047    
048      @VisibleForTesting
049      public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
050          new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
051          new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
052    
053      private final MetricsInfo numInfo;
054      private final MetricsInfo[] quantileInfos;
055      private final int interval;
056    
057      private SampleQuantiles estimator;
058      private long previousCount = 0;
059    
060      @VisibleForTesting
061      protected Map<Quantile, Long> previousSnapshot = null;
062    
063      private static final ScheduledExecutorService scheduler = Executors
064          .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
065              .setNameFormat("MutableQuantiles-%d").build());
066    
067      /**
068       * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself
069       * over on the specified time interval.
070       * 
071       * @param name
072       *          of the metric
073       * @param description
074       *          long-form textual description of the metric
075       * @param sampleName
076       *          type of items in the stream (e.g., "Ops")
077       * @param valueName
078       *          type of the values
079       * @param interval
080       *          rollover interval (in seconds) of the estimator
081       */
082      public MutableQuantiles(String name, String description, String sampleName,
083          String valueName, int interval) {
084        String ucName = StringUtils.capitalize(name);
085        String usName = StringUtils.capitalize(sampleName);
086        String uvName = StringUtils.capitalize(valueName);
087        String desc = StringUtils.uncapitalize(description);
088        String lsName = StringUtils.uncapitalize(sampleName);
089        String lvName = StringUtils.uncapitalize(valueName);
090    
091        numInfo = info(ucName + "Num" + usName, String.format(
092            "Number of %s for %s with %ds interval", lsName, desc, interval));
093        // Construct the MetricsInfos for the quantiles, converting to percentiles
094        quantileInfos = new MetricsInfo[quantiles.length];
095        String nameTemplate = ucName + "%dthPercentile" + uvName;
096        String descTemplate = "%d percentile " + lvName + " with " + interval
097            + " second interval for " + desc;
098        for (int i = 0; i < quantiles.length; i++) {
099          int percentile = (int) (100 * quantiles[i].quantile);
100          quantileInfos[i] = info(String.format(nameTemplate, percentile),
101              String.format(descTemplate, percentile));
102        }
103    
104        estimator = new SampleQuantiles(quantiles);
105    
106        this.interval = interval;
107        scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval,
108            TimeUnit.SECONDS);
109      }
110    
111      @Override
112      public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
113        if (all || changed()) {
114          builder.addGauge(numInfo, previousCount);
115          for (int i = 0; i < quantiles.length; i++) {
116            long newValue = 0;
117            // If snapshot is null, we failed to update since the window was empty
118            if (previousSnapshot != null) {
119              newValue = previousSnapshot.get(quantiles[i]);
120            }
121            builder.addGauge(quantileInfos[i], newValue);
122          }
123          if (changed()) {
124            clearChanged();
125          }
126        }
127      }
128    
129      public synchronized void add(long value) {
130        estimator.insert(value);
131      }
132    
133      public int getInterval() {
134        return interval;
135      }
136    
137      /**
138       * Runnable used to periodically roll over the internal
139       * {@link SampleQuantiles} every interval.
140       */
141      private static class RolloverSample implements Runnable {
142    
143        MutableQuantiles parent;
144    
145        public RolloverSample(MutableQuantiles parent) {
146          this.parent = parent;
147        }
148    
149        @Override
150        public void run() {
151          synchronized (parent) {
152            parent.previousCount = parent.estimator.getCount();
153            parent.previousSnapshot = parent.estimator.snapshot();
154            parent.estimator.clear();
155          }
156          parent.setChanged();
157        }
158    
159      }
160    }