001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import org.apache.commons.configuration.SubsetConfiguration; 022import org.apache.commons.io.Charsets; 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.metrics2.AbstractMetric; 028import org.apache.hadoop.metrics2.MetricsException; 029import org.apache.hadoop.metrics2.MetricsRecord; 030import org.apache.hadoop.metrics2.MetricsSink; 031import org.apache.hadoop.metrics2.MetricsTag; 032 033import java.io.Closeable; 034import java.io.IOException; 035import java.io.OutputStreamWriter; 036import java.io.Writer; 037import java.net.Socket; 038 039/** 040 * A metrics sink that writes to a Graphite server 041 */ 042@InterfaceAudience.Public 043@InterfaceStability.Evolving 044public class GraphiteSink implements MetricsSink, Closeable { 045 private static final Log LOG = LogFactory.getLog(GraphiteSink.class); 046 private static final String SERVER_HOST_KEY = "server_host"; 047 private static final String SERVER_PORT_KEY = "server_port"; 048 private static final String METRICS_PREFIX = "metrics_prefix"; 049 private String metricsPrefix = null; 050 private Graphite graphite = null; 051 052 @Override 053 public void init(SubsetConfiguration conf) { 054 // Get Graphite host configurations. 055 final String serverHost = conf.getString(SERVER_HOST_KEY); 056 final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); 057 058 // Get Graphite metrics graph prefix. 059 metricsPrefix = conf.getString(METRICS_PREFIX); 060 if (metricsPrefix == null) 061 metricsPrefix = ""; 062 063 graphite = new Graphite(serverHost, serverPort); 064 graphite.connect(); 065 } 066 067 @Override 068 public void putMetrics(MetricsRecord record) { 069 StringBuilder lines = new StringBuilder(); 070 StringBuilder metricsPathPrefix = new StringBuilder(); 071 072 // Configure the hierarchical place to display the graph. 073 metricsPathPrefix.append(metricsPrefix).append(".") 074 .append(record.context()).append(".").append(record.name()); 075 076 for (MetricsTag tag : record.tags()) { 077 if (tag.value() != null) { 078 metricsPathPrefix.append("."); 079 metricsPathPrefix.append(tag.name()); 080 metricsPathPrefix.append("="); 081 metricsPathPrefix.append(tag.value()); 082 } 083 } 084 085 // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. 086 long timestamp = record.timestamp() / 1000L; 087 088 // Collect datapoints. 089 for (AbstractMetric metric : record.metrics()) { 090 lines.append( 091 metricsPathPrefix.toString() + "." 092 + metric.name().replace(' ', '.')).append(" ") 093 .append(metric.value()).append(" ").append(timestamp) 094 .append("\n"); 095 } 096 097 try { 098 graphite.write(lines.toString()); 099 } catch (Exception e) { 100 LOG.warn("Error sending metrics to Graphite", e); 101 try { 102 graphite.close(); 103 } catch (Exception e1) { 104 throw new MetricsException("Error closing connection to Graphite", e1); 105 } 106 } 107 } 108 109 @Override 110 public void flush() { 111 try { 112 graphite.flush(); 113 } catch (Exception e) { 114 LOG.warn("Error flushing metrics to Graphite", e); 115 try { 116 graphite.close(); 117 } catch (Exception e1) { 118 throw new MetricsException("Error closing connection to Graphite", e1); 119 } 120 } 121 } 122 123 @Override 124 public void close() throws IOException { 125 graphite.close(); 126 } 127 128 public static class Graphite { 129 private final static int MAX_CONNECTION_FAILURES = 5; 130 131 private String serverHost; 132 private int serverPort; 133 private Writer writer = null; 134 private Socket socket = null; 135 private int connectionFailures = 0; 136 137 public Graphite(String serverHost, int serverPort) { 138 this.serverHost = serverHost; 139 this.serverPort = serverPort; 140 } 141 142 public void connect() { 143 if (isConnected()) { 144 throw new MetricsException("Already connected to Graphite"); 145 } 146 if (tooManyConnectionFailures()) { 147 // return silently (there was ERROR in logs when we reached limit for the first time) 148 return; 149 } 150 try { 151 // Open a connection to Graphite server. 152 socket = new Socket(serverHost, serverPort); 153 writer = new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8); 154 } catch (Exception e) { 155 connectionFailures++; 156 if (tooManyConnectionFailures()) { 157 // first time when connection limit reached, report to logs 158 LOG.error("Too many connection failures, would not try to connect again."); 159 } 160 throw new MetricsException("Error creating connection, " 161 + serverHost + ":" + serverPort, e); 162 } 163 } 164 165 public void write(String msg) throws IOException { 166 if (!isConnected()) { 167 connect(); 168 } 169 if (isConnected()) { 170 writer.write(msg); 171 } 172 } 173 174 public void flush() throws IOException { 175 if (isConnected()) { 176 writer.flush(); 177 } 178 } 179 180 public boolean isConnected() { 181 return socket != null && socket.isConnected() && !socket.isClosed(); 182 } 183 184 public void close() throws IOException { 185 try { 186 if (writer != null) { 187 writer.close(); 188 } 189 } catch (IOException ex) { 190 if (socket != null) { 191 socket.close(); 192 } 193 } finally { 194 socket = null; 195 writer = null; 196 } 197 } 198 199 private boolean tooManyConnectionFailures() { 200 return connectionFailures > MAX_CONNECTION_FAILURES; 201 } 202 203 } 204 205}