001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.sink;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.net.DatagramPacket;
024import java.net.DatagramSocket;
025import java.net.InetSocketAddress;
026import java.nio.charset.StandardCharsets;
027
028import org.apache.commons.configuration.SubsetConfiguration;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.metrics2.AbstractMetric;
032import org.apache.hadoop.metrics2.MetricType;
033import org.apache.hadoop.metrics2.MetricsException;
034import org.apache.hadoop.metrics2.MetricsRecord;
035import org.apache.hadoop.metrics2.MetricsSink;
036import org.apache.hadoop.metrics2.MetricsTag;
037import org.apache.hadoop.metrics2.impl.MsInfo;
038import org.apache.hadoop.net.NetUtils;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A metrics sink that writes metrics to a StatsD daemon.
044 * This sink will produce metrics of the form
045 * '[hostname].servicename.context.name.metricname:value|type'
046 * where hostname is optional. This is useful when sending to
047 * a daemon that is running on the localhost and will add the
048 * hostname to the metric (such as the
049 * <a href="https://collectd.org/">CollectD</a> StatsD plugin).
050 * <br/>
051 * To configure this plugin, you will need to add the following
052 * entries to your hadoop-metrics2.properties file:
053 * <br/>
054 * <pre>
055 * *.sink.statsd.class=org.apache.hadoop.metrics2.sink.StatsDSink
056 * [prefix].sink.statsd.server.host=
057 * [prefix].sink.statsd.server.port=
058 * [prefix].sink.statsd.skip.hostname=true|false (optional)
059 * [prefix].sink.statsd.service.name=NameNode (name you want for service)
060 * </pre>
061 */
062@InterfaceAudience.Public
063@InterfaceStability.Evolving
064public class StatsDSink implements MetricsSink, Closeable {
065  private static final Logger LOG = LoggerFactory.getLogger(StatsDSink.class);
066  private static final String PERIOD = ".";
067  private static final String SERVER_HOST_KEY = "server.host";
068  private static final String SERVER_PORT_KEY = "server.port";
069  private static final String HOST_NAME_KEY = "host.name";
070  private static final String SERVICE_NAME_KEY = "service.name";
071  private static final String SKIP_HOSTNAME_KEY = "skip.hostname";
072  private boolean skipHostname = false;
073  private String hostName = null;
074  private String serviceName = null;
075  private StatsD statsd = null;
076
077  @Override
078  public void init(SubsetConfiguration conf) {
079    // Get StatsD host configurations.
080    final String serverHost = conf.getString(SERVER_HOST_KEY);
081    final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
082
083    skipHostname = conf.getBoolean(SKIP_HOSTNAME_KEY, false);
084    if (!skipHostname) {
085      hostName = conf.getString(HOST_NAME_KEY, null);
086      if (null == hostName) {
087        hostName = NetUtils.getHostname();
088      }
089    }
090
091    serviceName = conf.getString(SERVICE_NAME_KEY, null);
092
093    statsd = new StatsD(serverHost, serverPort);
094  }
095
096  @Override
097  public void putMetrics(MetricsRecord record) {
098
099    String hn = hostName;
100    String ctx = record.context();
101    String sn = serviceName;
102
103    for (MetricsTag tag : record.tags()) {
104      if (tag.info().name().equals(MsInfo.Hostname.name())
105          && tag.value() != null) {
106        hn = tag.value();
107      } else if (tag.info().name().equals(MsInfo.Context.name())
108          && tag.value() != null) {
109        ctx = tag.value();
110      } else if (tag.info().name().equals(MsInfo.ProcessName.name())
111          && tag.value() != null) {
112        sn = tag.value();
113      }
114    }
115
116    StringBuilder buf = new StringBuilder();
117    if (!skipHostname && hn != null) {
118      int idx = hn.indexOf(".");
119      if (idx == -1) {
120        buf.append(hn).append(PERIOD);
121      } else {
122        buf.append(hn.substring(0, idx)).append(PERIOD);
123      }
124    }
125    buf.append(sn).append(PERIOD);
126    buf.append(ctx).append(PERIOD);
127    buf.append(record.name().replaceAll("\\.", "-")).append(PERIOD);
128
129    // Collect datapoints.
130    for (AbstractMetric metric : record.metrics()) {
131      String type = null;
132      if (metric.type().equals(MetricType.COUNTER)) {
133        type = "c";
134      } else if (metric.type().equals(MetricType.GAUGE)) {
135        type = "g";
136      }
137      StringBuilder line = new StringBuilder();
138      line.append(buf.toString())
139          .append(metric.name().replace(' ', '_'))
140          .append(":")
141          .append(metric.value())
142          .append("|")
143          .append(type);
144      writeMetric(line.toString());
145    }
146
147  }
148
149  public void writeMetric(String line) {
150    try {
151      statsd.write(line);
152    } catch (IOException e) {
153      LOG.warn("Error sending metrics to StatsD", e);
154      throw new MetricsException("Error writing metric to StatsD", e);
155    }
156  }
157
158  @Override
159  public void flush() {
160  }
161
162  @Override
163  public void close() throws IOException {
164    statsd.close();
165  }
166
167  /**
168   * Class that sends UDP packets to StatsD daemon.
169   *
170   */
171  public static class StatsD {
172
173    private DatagramSocket socket = null;
174    private DatagramPacket packet = null;
175    private String serverHost;
176    private int serverPort;
177
178    public StatsD(String serverHost, int serverPort) {
179      this.serverHost = serverHost;
180      this.serverPort = serverPort;
181    }
182
183    public void createSocket() throws IOException {
184      try {
185        InetSocketAddress address =
186            new InetSocketAddress(this.serverHost, this.serverPort);
187        socket = new DatagramSocket();
188        packet =
189            new DatagramPacket("".getBytes(StandardCharsets.UTF_8), 0, 0,
190                address.getAddress(), this.serverPort);
191      } catch (IOException ioe) {
192        throw NetUtils.wrapException(this.serverHost, this.serverPort,
193            "localhost", 0, ioe);
194      }
195    }
196
197    public void write(String msg) throws IOException {
198      if (null == socket) {
199        createSocket();
200      }
201      LOG.debug("Sending metric: {}", msg);
202      packet.setData(msg.getBytes(StandardCharsets.UTF_8));
203      socket.send(packet);
204    }
205
206    public void close() throws IOException {
207      try {
208        if (socket != null) {
209          socket.close();
210        }
211      } finally {
212        socket = null;
213      }
214    }
215
216  }
217
218}