001/* 002 * GangliaContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.ganglia; 022 023import java.io.IOException; 024import java.net.*; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.commons.io.Charsets; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.metrics.ContextFactory; 036import org.apache.hadoop.metrics.spi.AbstractMetricsContext; 037import org.apache.hadoop.metrics.spi.OutputRecord; 038import org.apache.hadoop.metrics.spi.Util; 039 040/** 041 * Context for sending metrics to Ganglia. 042 * 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Evolving 046public class GangliaContext extends AbstractMetricsContext { 047 048 private static final String PERIOD_PROPERTY = "period"; 049 private static final String SERVERS_PROPERTY = "servers"; 050 private static final String UNITS_PROPERTY = "units"; 051 private static final String SLOPE_PROPERTY = "slope"; 052 private static final String TMAX_PROPERTY = "tmax"; 053 private static final String DMAX_PROPERTY = "dmax"; 054 private static final String MULTICAST_PROPERTY = "multicast"; 055 private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl"; 056 057 private static final String DEFAULT_UNITS = ""; 058 private static final String DEFAULT_SLOPE = "both"; 059 private static final int DEFAULT_TMAX = 60; 060 private static final int DEFAULT_DMAX = 0; 061 private static final int DEFAULT_PORT = 8649; 062 private static final int BUFFER_SIZE = 1500; // as per libgmond.c 063 private static final int DEFAULT_MULTICAST_TTL = 1; 064 065 private final Log LOG = LogFactory.getLog(this.getClass()); 066 067 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5); 068 069 static { 070 typeTable.put(String.class, "string"); 071 typeTable.put(Byte.class, "int8"); 072 typeTable.put(Short.class, "int16"); 073 typeTable.put(Integer.class, "int32"); 074 typeTable.put(Long.class, "float"); 075 typeTable.put(Float.class, "float"); 076 } 077 078 protected byte[] buffer = new byte[BUFFER_SIZE]; 079 protected int offset; 080 081 protected List<? extends SocketAddress> metricsServers; 082 private Map<String,String> unitsTable; 083 private Map<String,String> slopeTable; 084 private Map<String,String> tmaxTable; 085 private Map<String,String> dmaxTable; 086 private boolean multicastEnabled; 087 private int multicastTtl; 088 089 protected DatagramSocket datagramSocket; 090 091 /** Creates a new instance of GangliaContext */ 092 @InterfaceAudience.Private 093 public GangliaContext() { 094 } 095 096 @Override 097 @InterfaceAudience.Private 098 public void init(String contextName, ContextFactory factory) { 099 super.init(contextName, factory); 100 parseAndSetPeriod(PERIOD_PROPERTY); 101 102 metricsServers = 103 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 104 105 unitsTable = getAttributeTable(UNITS_PROPERTY); 106 slopeTable = getAttributeTable(SLOPE_PROPERTY); 107 tmaxTable = getAttributeTable(TMAX_PROPERTY); 108 dmaxTable = getAttributeTable(DMAX_PROPERTY); 109 multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY)); 110 String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY); 111 if (multicastEnabled) { 112 if (multicastTtlValue == null) { 113 multicastTtl = DEFAULT_MULTICAST_TTL; 114 } else { 115 multicastTtl = Integer.parseInt(multicastTtlValue); 116 } 117 } 118 119 try { 120 if (multicastEnabled) { 121 LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl); 122 datagramSocket = new MulticastSocket(); 123 ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl); 124 } else { 125 datagramSocket = new DatagramSocket(); 126 } 127 } catch (IOException e) { 128 LOG.error(e); 129 } 130 } 131 132 /** 133 * method to close the datagram socket 134 */ 135 @Override 136 public void close() { 137 super.close(); 138 if (datagramSocket != null) { 139 datagramSocket.close(); 140 } 141 } 142 143 @Override 144 @InterfaceAudience.Private 145 public void emitRecord(String contextName, String recordName, 146 OutputRecord outRec) 147 throws IOException { 148 // Setup so that the records have the proper leader names so they are 149 // unambiguous at the ganglia level, and this prevents a lot of rework 150 StringBuilder sb = new StringBuilder(); 151 sb.append(contextName); 152 sb.append('.'); 153 154 if (contextName.equals("jvm") && outRec.getTag("processName") != null) { 155 sb.append(outRec.getTag("processName")); 156 sb.append('.'); 157 } 158 159 sb.append(recordName); 160 sb.append('.'); 161 int sbBaseLen = sb.length(); 162 163 // emit each metric in turn 164 for (String metricName : outRec.getMetricNames()) { 165 Object metric = outRec.getMetric(metricName); 166 String type = typeTable.get(metric.getClass()); 167 if (type != null) { 168 sb.append(metricName); 169 emitMetric(sb.toString(), type, metric.toString()); 170 sb.setLength(sbBaseLen); 171 } else { 172 LOG.warn("Unknown metrics type: " + metric.getClass()); 173 } 174 } 175 } 176 177 protected void emitMetric(String name, String type, String value) 178 throws IOException { 179 String units = getUnits(name); 180 int slope = getSlope(name); 181 int tmax = getTmax(name); 182 int dmax = getDmax(name); 183 184 offset = 0; 185 xdr_int(0); // metric_user_defined 186 xdr_string(type); 187 xdr_string(name); 188 xdr_string(value); 189 xdr_string(units); 190 xdr_int(slope); 191 xdr_int(tmax); 192 xdr_int(dmax); 193 194 for (SocketAddress socketAddress : metricsServers) { 195 DatagramPacket packet = 196 new DatagramPacket(buffer, offset, socketAddress); 197 datagramSocket.send(packet); 198 } 199 } 200 201 protected String getUnits(String metricName) { 202 String result = unitsTable.get(metricName); 203 if (result == null) { 204 result = DEFAULT_UNITS; 205 } 206 return result; 207 } 208 209 protected int getSlope(String metricName) { 210 String slopeString = slopeTable.get(metricName); 211 if (slopeString == null) { 212 slopeString = DEFAULT_SLOPE; 213 } 214 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c 215 } 216 217 protected int getTmax(String metricName) { 218 if (tmaxTable == null) { 219 return DEFAULT_TMAX; 220 } 221 String tmaxString = tmaxTable.get(metricName); 222 if (tmaxString == null) { 223 return DEFAULT_TMAX; 224 } 225 else { 226 return Integer.parseInt(tmaxString); 227 } 228 } 229 230 protected int getDmax(String metricName) { 231 String dmaxString = dmaxTable.get(metricName); 232 if (dmaxString == null) { 233 return DEFAULT_DMAX; 234 } 235 else { 236 return Integer.parseInt(dmaxString); 237 } 238 } 239 240 /** 241 * Puts a string into the buffer by first writing the size of the string 242 * as an int, followed by the bytes of the string, padded if necessary to 243 * a multiple of 4. 244 */ 245 protected void xdr_string(String s) { 246 byte[] bytes = s.getBytes(Charsets.UTF_8); 247 int len = bytes.length; 248 xdr_int(len); 249 System.arraycopy(bytes, 0, buffer, offset, len); 250 offset += len; 251 pad(); 252 } 253 254 /** 255 * Pads the buffer with zero bytes up to the nearest multiple of 4. 256 */ 257 private void pad() { 258 int newOffset = ((offset + 3) / 4) * 4; 259 while (offset < newOffset) { 260 buffer[offset++] = 0; 261 } 262 } 263 264 /** 265 * Puts an integer into the buffer as 4 bytes, big-endian. 266 */ 267 protected void xdr_int(int i) { 268 buffer[offset++] = (byte)((i >> 24) & 0xff); 269 buffer[offset++] = (byte)((i >> 16) & 0xff); 270 buffer[offset++] = (byte)((i >> 8) & 0xff); 271 buffer[offset++] = (byte)(i & 0xff); 272 } 273}