001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.metrics2.sink; 020 021 import java.io.IOException; 022 import java.io.OutputStreamWriter; 023 import java.io.Writer; 024 import java.io.Closeable; 025 import java.net.Socket; 026 027 import org.apache.commons.configuration.SubsetConfiguration; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 import org.apache.hadoop.classification.InterfaceAudience; 031 import org.apache.hadoop.classification.InterfaceStability; 032 import org.apache.hadoop.io.IOUtils; 033 import org.apache.hadoop.metrics2.AbstractMetric; 034 import org.apache.hadoop.metrics2.MetricsException; 035 import org.apache.hadoop.metrics2.MetricsRecord; 036 import org.apache.hadoop.metrics2.MetricsSink; 037 import org.apache.hadoop.metrics2.MetricsTag; 038 039 /** 040 * A metrics sink that writes to a Graphite server 041 */ 042 @InterfaceAudience.Public 043 @InterfaceStability.Evolving 044 public class GraphiteSink implements MetricsSink, Closeable { 045 private static final Log LOG = LogFactory.getLog(GraphiteSink.class); 046 private static final String SERVER_HOST_KEY = "server_host"; 047 private static final String SERVER_PORT_KEY = "server_port"; 048 private static final String METRICS_PREFIX = "metrics_prefix"; 049 private Writer writer = null; 050 private String metricsPrefix = null; 051 private Socket socket = null; 052 053 @Override 054 public void init(SubsetConfiguration conf) { 055 // Get Graphite host configurations. 056 String serverHost = conf.getString(SERVER_HOST_KEY); 057 Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); 058 059 // Get Graphite metrics graph prefix. 060 metricsPrefix = conf.getString(METRICS_PREFIX); 061 if (metricsPrefix == null) 062 metricsPrefix = ""; 063 064 try { 065 // Open an connection to Graphite server. 066 socket = new Socket(serverHost, serverPort); 067 writer = new OutputStreamWriter(socket.getOutputStream()); 068 } catch (Exception e) { 069 throw new MetricsException("Error creating connection, " 070 + serverHost + ":" + serverPort, e); 071 } 072 } 073 074 @Override 075 public void putMetrics(MetricsRecord record) { 076 StringBuilder lines = new StringBuilder(); 077 StringBuilder metricsPathPrefix = new StringBuilder(); 078 079 // Configure the hierarchical place to display the graph. 080 metricsPathPrefix.append(metricsPrefix).append(".") 081 .append(record.context()).append(".").append(record.name()); 082 083 for (MetricsTag tag : record.tags()) { 084 if (tag.value() != null) { 085 metricsPathPrefix.append("."); 086 metricsPathPrefix.append(tag.name()); 087 metricsPathPrefix.append("="); 088 metricsPathPrefix.append(tag.value()); 089 } 090 } 091 092 // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. 093 long timestamp = record.timestamp() / 1000L; 094 095 // Collect datapoints. 096 for (AbstractMetric metric : record.metrics()) { 097 lines.append( 098 metricsPathPrefix.toString() + "." 099 + metric.name().replace(' ', '.')).append(" ") 100 .append(metric.value()).append(" ").append(timestamp) 101 .append("\n"); 102 } 103 104 try { 105 if(writer != null){ 106 writer.write(lines.toString()); 107 } else { 108 throw new MetricsException("Writer in GraphiteSink is null!"); 109 } 110 } catch (Exception e) { 111 throw new MetricsException("Error sending metrics", e); 112 } 113 } 114 115 @Override 116 public void flush() { 117 try { 118 writer.flush(); 119 } catch (Exception e) { 120 throw new MetricsException("Error flushing metrics", e); 121 } 122 } 123 124 @Override 125 public void close() throws IOException { 126 try { 127 IOUtils.closeStream(writer); 128 writer = null; 129 LOG.info("writer in GraphiteSink is closed!"); 130 } catch (Throwable e){ 131 throw new MetricsException("Error closing writer", e); 132 } finally { 133 if (socket != null && !socket.isClosed()) { 134 socket.close(); 135 socket = null; 136 LOG.info("socket in GraphiteSink is closed!"); 137 } 138 } 139 } 140 }