001/* 002 * GangliaContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.ganglia; 022 023import java.io.IOException; 024import java.net.DatagramPacket; 025import java.net.DatagramSocket; 026import java.net.SocketAddress; 027import java.net.SocketException; 028import java.util.HashMap; 029import java.util.List; 030import java.util.Map; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceStability; 037import org.apache.hadoop.metrics.ContextFactory; 038import org.apache.hadoop.metrics.MetricsException; 039import org.apache.hadoop.metrics.spi.AbstractMetricsContext; 040import org.apache.hadoop.metrics.spi.OutputRecord; 041import org.apache.hadoop.metrics.spi.Util; 042 043/** 044 * Context for sending metrics to Ganglia. 045 * 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Evolving 049public class GangliaContext extends AbstractMetricsContext { 050 051 private static final String PERIOD_PROPERTY = "period"; 052 private static final String SERVERS_PROPERTY = "servers"; 053 private static final String UNITS_PROPERTY = "units"; 054 private static final String SLOPE_PROPERTY = "slope"; 055 private static final String TMAX_PROPERTY = "tmax"; 056 private static final String DMAX_PROPERTY = "dmax"; 057 058 private static final String DEFAULT_UNITS = ""; 059 private static final String DEFAULT_SLOPE = "both"; 060 private static final int DEFAULT_TMAX = 60; 061 private static final int DEFAULT_DMAX = 0; 062 private static final int DEFAULT_PORT = 8649; 063 private static final int BUFFER_SIZE = 1500; // as per libgmond.c 064 065 private final Log LOG = LogFactory.getLog(this.getClass()); 066 067 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5); 068 069 static { 070 typeTable.put(String.class, "string"); 071 typeTable.put(Byte.class, "int8"); 072 typeTable.put(Short.class, "int16"); 073 typeTable.put(Integer.class, "int32"); 074 typeTable.put(Long.class, "float"); 075 typeTable.put(Float.class, "float"); 076 } 077 078 protected byte[] buffer = new byte[BUFFER_SIZE]; 079 protected int offset; 080 081 protected List<? extends SocketAddress> metricsServers; 082 private Map<String,String> unitsTable; 083 private Map<String,String> slopeTable; 084 private Map<String,String> tmaxTable; 085 private Map<String,String> dmaxTable; 086 087 protected DatagramSocket datagramSocket; 088 089 /** Creates a new instance of GangliaContext */ 090 @InterfaceAudience.Private 091 public GangliaContext() { 092 } 093 094 @InterfaceAudience.Private 095 public void init(String contextName, ContextFactory factory) { 096 super.init(contextName, factory); 097 parseAndSetPeriod(PERIOD_PROPERTY); 098 099 metricsServers = 100 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 101 102 unitsTable = getAttributeTable(UNITS_PROPERTY); 103 slopeTable = getAttributeTable(SLOPE_PROPERTY); 104 tmaxTable = getAttributeTable(TMAX_PROPERTY); 105 dmaxTable = getAttributeTable(DMAX_PROPERTY); 106 107 try { 108 datagramSocket = new DatagramSocket(); 109 } 110 catch (SocketException se) { 111 se.printStackTrace(); 112 } 113 } 114 115 /** 116 * method to close the datagram socket 117 */ 118 @Override 119 public void close() { 120 super.close(); 121 if (datagramSocket != null) { 122 datagramSocket.close(); 123 } 124 } 125 126 @InterfaceAudience.Private 127 public void emitRecord(String contextName, String recordName, 128 OutputRecord outRec) 129 throws IOException { 130 // Setup so that the records have the proper leader names so they are 131 // unambiguous at the ganglia level, and this prevents a lot of rework 132 StringBuilder sb = new StringBuilder(); 133 sb.append(contextName); 134 sb.append('.'); 135 136 if (contextName.equals("jvm") && outRec.getTag("processName") != null) { 137 sb.append(outRec.getTag("processName")); 138 sb.append('.'); 139 } 140 141 sb.append(recordName); 142 sb.append('.'); 143 int sbBaseLen = sb.length(); 144 145 // emit each metric in turn 146 for (String metricName : outRec.getMetricNames()) { 147 Object metric = outRec.getMetric(metricName); 148 String type = typeTable.get(metric.getClass()); 149 if (type != null) { 150 sb.append(metricName); 151 emitMetric(sb.toString(), type, metric.toString()); 152 sb.setLength(sbBaseLen); 153 } else { 154 LOG.warn("Unknown metrics type: " + metric.getClass()); 155 } 156 } 157 } 158 159 protected void emitMetric(String name, String type, String value) 160 throws IOException { 161 String units = getUnits(name); 162 int slope = getSlope(name); 163 int tmax = getTmax(name); 164 int dmax = getDmax(name); 165 166 offset = 0; 167 xdr_int(0); // metric_user_defined 168 xdr_string(type); 169 xdr_string(name); 170 xdr_string(value); 171 xdr_string(units); 172 xdr_int(slope); 173 xdr_int(tmax); 174 xdr_int(dmax); 175 176 for (SocketAddress socketAddress : metricsServers) { 177 DatagramPacket packet = 178 new DatagramPacket(buffer, offset, socketAddress); 179 datagramSocket.send(packet); 180 } 181 } 182 183 protected String getUnits(String metricName) { 184 String result = unitsTable.get(metricName); 185 if (result == null) { 186 result = DEFAULT_UNITS; 187 } 188 return result; 189 } 190 191 protected int getSlope(String metricName) { 192 String slopeString = slopeTable.get(metricName); 193 if (slopeString == null) { 194 slopeString = DEFAULT_SLOPE; 195 } 196 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c 197 } 198 199 protected int getTmax(String metricName) { 200 if (tmaxTable == null) { 201 return DEFAULT_TMAX; 202 } 203 String tmaxString = tmaxTable.get(metricName); 204 if (tmaxString == null) { 205 return DEFAULT_TMAX; 206 } 207 else { 208 return Integer.parseInt(tmaxString); 209 } 210 } 211 212 protected int getDmax(String metricName) { 213 String dmaxString = dmaxTable.get(metricName); 214 if (dmaxString == null) { 215 return DEFAULT_DMAX; 216 } 217 else { 218 return Integer.parseInt(dmaxString); 219 } 220 } 221 222 /** 223 * Puts a string into the buffer by first writing the size of the string 224 * as an int, followed by the bytes of the string, padded if necessary to 225 * a multiple of 4. 226 */ 227 protected void xdr_string(String s) { 228 byte[] bytes = s.getBytes(); 229 int len = bytes.length; 230 xdr_int(len); 231 System.arraycopy(bytes, 0, buffer, offset, len); 232 offset += len; 233 pad(); 234 } 235 236 /** 237 * Pads the buffer with zero bytes up to the nearest multiple of 4. 238 */ 239 private void pad() { 240 int newOffset = ((offset + 3) / 4) * 4; 241 while (offset < newOffset) { 242 buffer[offset++] = 0; 243 } 244 } 245 246 /** 247 * Puts an integer into the buffer as 4 bytes, big-endian. 248 */ 249 protected void xdr_int(int i) { 250 buffer[offset++] = (byte)((i >> 24) & 0xff); 251 buffer[offset++] = (byte)((i >> 16) & 0xff); 252 buffer[offset++] = (byte)((i >> 8) & 0xff); 253 buffer[offset++] = (byte)(i & 0xff); 254 } 255}