001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.metrics2.sink;
020
021 import java.io.IOException;
022 import java.io.OutputStreamWriter;
023 import java.io.Writer;
024 import java.io.Closeable;
025 import java.net.Socket;
026
027 import org.apache.commons.configuration.SubsetConfiguration;
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.io.IOUtils;
033 import org.apache.hadoop.metrics2.AbstractMetric;
034 import org.apache.hadoop.metrics2.MetricsException;
035 import org.apache.hadoop.metrics2.MetricsRecord;
036 import org.apache.hadoop.metrics2.MetricsSink;
037 import org.apache.hadoop.metrics2.MetricsTag;
038
039 /**
040 * A metrics sink that writes to a Graphite server
041 */
042 @InterfaceAudience.Public
043 @InterfaceStability.Evolving
044 public class GraphiteSink implements MetricsSink, Closeable {
045 private static final Log LOG = LogFactory.getLog(GraphiteSink.class);
046 private static final String SERVER_HOST_KEY = "server_host";
047 private static final String SERVER_PORT_KEY = "server_port";
048 private static final String METRICS_PREFIX = "metrics_prefix";
049 private Writer writer = null;
050 private String metricsPrefix = null;
051 private Socket socket = null;
052
053 @Override
054 public void init(SubsetConfiguration conf) {
055 // Get Graphite host configurations.
056 String serverHost = conf.getString(SERVER_HOST_KEY);
057 Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
058
059 // Get Graphite metrics graph prefix.
060 metricsPrefix = conf.getString(METRICS_PREFIX);
061 if (metricsPrefix == null)
062 metricsPrefix = "";
063
064 try {
065 // Open an connection to Graphite server.
066 socket = new Socket(serverHost, serverPort);
067 writer = new OutputStreamWriter(socket.getOutputStream());
068 } catch (Exception e) {
069 throw new MetricsException("Error creating connection, "
070 + serverHost + ":" + serverPort, e);
071 }
072 }
073
074 @Override
075 public void putMetrics(MetricsRecord record) {
076 StringBuilder lines = new StringBuilder();
077 StringBuilder metricsPathPrefix = new StringBuilder();
078
079 // Configure the hierarchical place to display the graph.
080 metricsPathPrefix.append(metricsPrefix).append(".")
081 .append(record.context()).append(".").append(record.name());
082
083 for (MetricsTag tag : record.tags()) {
084 if (tag.value() != null) {
085 metricsPathPrefix.append(".");
086 metricsPathPrefix.append(tag.name());
087 metricsPathPrefix.append("=");
088 metricsPathPrefix.append(tag.value());
089 }
090 }
091
092 // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds.
093 long timestamp = record.timestamp() / 1000L;
094
095 // Collect datapoints.
096 for (AbstractMetric metric : record.metrics()) {
097 lines.append(
098 metricsPathPrefix.toString() + "."
099 + metric.name().replace(' ', '.')).append(" ")
100 .append(metric.value()).append(" ").append(timestamp)
101 .append("\n");
102 }
103
104 try {
105 if(writer != null){
106 writer.write(lines.toString());
107 } else {
108 throw new MetricsException("Writer in GraphiteSink is null!");
109 }
110 } catch (Exception e) {
111 throw new MetricsException("Error sending metrics", e);
112 }
113 }
114
115 @Override
116 public void flush() {
117 try {
118 writer.flush();
119 } catch (Exception e) {
120 throw new MetricsException("Error flushing metrics", e);
121 }
122 }
123
124 @Override
125 public void close() throws IOException {
126 try {
127 IOUtils.closeStream(writer);
128 writer = null;
129 LOG.info("writer in GraphiteSink is closed!");
130 } catch (Throwable e){
131 throw new MetricsException("Error closing writer", e);
132 } finally {
133 if (socket != null && !socket.isClosed()) {
134 socket.close();
135 socket = null;
136 LOG.info("socket in GraphiteSink is closed!");
137 }
138 }
139 }
140 }