001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.DatagramPacket; 024import java.net.DatagramSocket; 025import java.net.InetSocketAddress; 026import java.nio.charset.StandardCharsets; 027 028import org.apache.commons.configuration.SubsetConfiguration; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.metrics2.AbstractMetric; 032import org.apache.hadoop.metrics2.MetricType; 033import org.apache.hadoop.metrics2.MetricsException; 034import org.apache.hadoop.metrics2.MetricsRecord; 035import org.apache.hadoop.metrics2.MetricsSink; 036import org.apache.hadoop.metrics2.MetricsTag; 037import org.apache.hadoop.metrics2.impl.MsInfo; 038import org.apache.hadoop.net.NetUtils; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * A metrics sink that writes metrics to a StatsD daemon. 044 * This sink will produce metrics of the form 045 * '[hostname].servicename.context.name.metricname:value|type' 046 * where hostname is optional. This is useful when sending to 047 * a daemon that is running on the localhost and will add the 048 * hostname to the metric (such as the 049 * <a href="https://collectd.org/">CollectD</a> StatsD plugin). 050 * <br/> 051 * To configure this plugin, you will need to add the following 052 * entries to your hadoop-metrics2.properties file: 053 * <br/> 054 * <pre> 055 * *.sink.statsd.class=org.apache.hadoop.metrics2.sink.StatsDSink 056 * [prefix].sink.statsd.server.host= 057 * [prefix].sink.statsd.server.port= 058 * [prefix].sink.statsd.skip.hostname=true|false (optional) 059 * [prefix].sink.statsd.service.name=NameNode (name you want for service) 060 * </pre> 061 */ 062@InterfaceAudience.Public 063@InterfaceStability.Evolving 064public class StatsDSink implements MetricsSink, Closeable { 065 private static final Logger LOG = LoggerFactory.getLogger(StatsDSink.class); 066 private static final String PERIOD = "."; 067 private static final String SERVER_HOST_KEY = "server.host"; 068 private static final String SERVER_PORT_KEY = "server.port"; 069 private static final String HOST_NAME_KEY = "host.name"; 070 private static final String SERVICE_NAME_KEY = "service.name"; 071 private static final String SKIP_HOSTNAME_KEY = "skip.hostname"; 072 private boolean skipHostname = false; 073 private String hostName = null; 074 private String serviceName = null; 075 private StatsD statsd = null; 076 077 @Override 078 public void init(SubsetConfiguration conf) { 079 // Get StatsD host configurations. 080 final String serverHost = conf.getString(SERVER_HOST_KEY); 081 final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); 082 083 skipHostname = conf.getBoolean(SKIP_HOSTNAME_KEY, false); 084 if (!skipHostname) { 085 hostName = conf.getString(HOST_NAME_KEY, null); 086 if (null == hostName) { 087 hostName = NetUtils.getHostname(); 088 } 089 } 090 091 serviceName = conf.getString(SERVICE_NAME_KEY, null); 092 093 statsd = new StatsD(serverHost, serverPort); 094 } 095 096 @Override 097 public void putMetrics(MetricsRecord record) { 098 099 String hn = hostName; 100 String ctx = record.context(); 101 String sn = serviceName; 102 103 for (MetricsTag tag : record.tags()) { 104 if (tag.info().name().equals(MsInfo.Hostname.name()) 105 && tag.value() != null) { 106 hn = tag.value(); 107 } else if (tag.info().name().equals(MsInfo.Context.name()) 108 && tag.value() != null) { 109 ctx = tag.value(); 110 } else if (tag.info().name().equals(MsInfo.ProcessName.name()) 111 && tag.value() != null) { 112 sn = tag.value(); 113 } 114 } 115 116 StringBuilder buf = new StringBuilder(); 117 if (!skipHostname && hn != null) { 118 int idx = hn.indexOf("."); 119 if (idx == -1) { 120 buf.append(hn).append(PERIOD); 121 } else { 122 buf.append(hn.substring(0, idx)).append(PERIOD); 123 } 124 } 125 buf.append(sn).append(PERIOD); 126 buf.append(ctx).append(PERIOD); 127 buf.append(record.name().replaceAll("\\.", "-")).append(PERIOD); 128 129 // Collect datapoints. 130 for (AbstractMetric metric : record.metrics()) { 131 String type = null; 132 if (metric.type().equals(MetricType.COUNTER)) { 133 type = "c"; 134 } else if (metric.type().equals(MetricType.GAUGE)) { 135 type = "g"; 136 } 137 StringBuilder line = new StringBuilder(); 138 line.append(buf.toString()) 139 .append(metric.name().replace(' ', '_')) 140 .append(":") 141 .append(metric.value()) 142 .append("|") 143 .append(type); 144 writeMetric(line.toString()); 145 } 146 147 } 148 149 public void writeMetric(String line) { 150 try { 151 statsd.write(line); 152 } catch (IOException e) { 153 LOG.warn("Error sending metrics to StatsD", e); 154 throw new MetricsException("Error writing metric to StatsD", e); 155 } 156 } 157 158 @Override 159 public void flush() { 160 } 161 162 @Override 163 public void close() throws IOException { 164 statsd.close(); 165 } 166 167 /** 168 * Class that sends UDP packets to StatsD daemon. 169 * 170 */ 171 public static class StatsD { 172 173 private DatagramSocket socket = null; 174 private DatagramPacket packet = null; 175 private String serverHost; 176 private int serverPort; 177 178 public StatsD(String serverHost, int serverPort) { 179 this.serverHost = serverHost; 180 this.serverPort = serverPort; 181 } 182 183 public void createSocket() throws IOException { 184 try { 185 InetSocketAddress address = 186 new InetSocketAddress(this.serverHost, this.serverPort); 187 socket = new DatagramSocket(); 188 packet = 189 new DatagramPacket("".getBytes(StandardCharsets.UTF_8), 0, 0, 190 address.getAddress(), this.serverPort); 191 } catch (IOException ioe) { 192 throw NetUtils.wrapException(this.serverHost, this.serverPort, 193 "localhost", 0, ioe); 194 } 195 } 196 197 public void write(String msg) throws IOException { 198 if (null == socket) { 199 createSocket(); 200 } 201 LOG.debug("Sending metric: {}", msg); 202 packet.setData(msg.getBytes(StandardCharsets.UTF_8)); 203 socket.send(packet); 204 } 205 206 public void close() throws IOException { 207 try { 208 if (socket != null) { 209 socket.close(); 210 } 211 } finally { 212 socket = null; 213 } 214 } 215 216 } 217 218}