001/* 002 * GangliaContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.ganglia; 022 023import java.io.IOException; 024import java.net.*; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.commons.io.Charsets; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.apache.hadoop.classification.InterfaceAudience; 034import org.apache.hadoop.classification.InterfaceStability; 035import org.apache.hadoop.metrics.ContextFactory; 036import org.apache.hadoop.metrics.spi.AbstractMetricsContext; 037import org.apache.hadoop.metrics.spi.OutputRecord; 038import org.apache.hadoop.metrics.spi.Util; 039 040/** 041 * Context for sending metrics to Ganglia. 042 * 043 * @deprecated Use {@link org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30} 044 * instead. 045 */ 046@Deprecated 047@InterfaceAudience.Public 048@InterfaceStability.Evolving 049public class GangliaContext extends AbstractMetricsContext { 050 051 private static final String PERIOD_PROPERTY = "period"; 052 private static final String SERVERS_PROPERTY = "servers"; 053 private static final String UNITS_PROPERTY = "units"; 054 private static final String SLOPE_PROPERTY = "slope"; 055 private static final String TMAX_PROPERTY = "tmax"; 056 private static final String DMAX_PROPERTY = "dmax"; 057 private static final String MULTICAST_PROPERTY = "multicast"; 058 private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl"; 059 060 private static final String DEFAULT_UNITS = ""; 061 private static final String DEFAULT_SLOPE = "both"; 062 private static final int DEFAULT_TMAX = 60; 063 private static final int DEFAULT_DMAX = 0; 064 private static final int DEFAULT_PORT = 8649; 065 private static final int BUFFER_SIZE = 1500; // as per libgmond.c 066 private static final int DEFAULT_MULTICAST_TTL = 1; 067 068 private final Log LOG = LogFactory.getLog(this.getClass()); 069 070 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5); 071 072 static { 073 typeTable.put(String.class, "string"); 074 typeTable.put(Byte.class, "int8"); 075 typeTable.put(Short.class, "int16"); 076 typeTable.put(Integer.class, "int32"); 077 typeTable.put(Long.class, "float"); 078 typeTable.put(Float.class, "float"); 079 } 080 081 protected byte[] buffer = new byte[BUFFER_SIZE]; 082 protected int offset; 083 084 protected List<? extends SocketAddress> metricsServers; 085 private Map<String,String> unitsTable; 086 private Map<String,String> slopeTable; 087 private Map<String,String> tmaxTable; 088 private Map<String,String> dmaxTable; 089 private boolean multicastEnabled; 090 private int multicastTtl; 091 092 protected DatagramSocket datagramSocket; 093 094 /** Creates a new instance of GangliaContext */ 095 @InterfaceAudience.Private 096 public GangliaContext() { 097 } 098 099 @Override 100 @InterfaceAudience.Private 101 public void init(String contextName, ContextFactory factory) { 102 super.init(contextName, factory); 103 parseAndSetPeriod(PERIOD_PROPERTY); 104 105 metricsServers = 106 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 107 108 unitsTable = getAttributeTable(UNITS_PROPERTY); 109 slopeTable = getAttributeTable(SLOPE_PROPERTY); 110 tmaxTable = getAttributeTable(TMAX_PROPERTY); 111 dmaxTable = getAttributeTable(DMAX_PROPERTY); 112 multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY)); 113 String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY); 114 if (multicastEnabled) { 115 if (multicastTtlValue == null) { 116 multicastTtl = DEFAULT_MULTICAST_TTL; 117 } else { 118 multicastTtl = Integer.parseInt(multicastTtlValue); 119 } 120 } 121 122 try { 123 if (multicastEnabled) { 124 LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl); 125 datagramSocket = new MulticastSocket(); 126 ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl); 127 } else { 128 datagramSocket = new DatagramSocket(); 129 } 130 } catch (IOException e) { 131 LOG.error(e); 132 } 133 } 134 135 /** 136 * method to close the datagram socket 137 */ 138 @Override 139 public void close() { 140 super.close(); 141 if (datagramSocket != null) { 142 datagramSocket.close(); 143 } 144 } 145 146 @Override 147 @InterfaceAudience.Private 148 public void emitRecord(String contextName, String recordName, 149 OutputRecord outRec) 150 throws IOException { 151 // Setup so that the records have the proper leader names so they are 152 // unambiguous at the ganglia level, and this prevents a lot of rework 153 StringBuilder sb = new StringBuilder(); 154 sb.append(contextName); 155 sb.append('.'); 156 157 if (contextName.equals("jvm") && outRec.getTag("processName") != null) { 158 sb.append(outRec.getTag("processName")); 159 sb.append('.'); 160 } 161 162 sb.append(recordName); 163 sb.append('.'); 164 int sbBaseLen = sb.length(); 165 166 // emit each metric in turn 167 for (String metricName : outRec.getMetricNames()) { 168 Object metric = outRec.getMetric(metricName); 169 String type = typeTable.get(metric.getClass()); 170 if (type != null) { 171 sb.append(metricName); 172 emitMetric(sb.toString(), type, metric.toString()); 173 sb.setLength(sbBaseLen); 174 } else { 175 LOG.warn("Unknown metrics type: " + metric.getClass()); 176 } 177 } 178 } 179 180 protected void emitMetric(String name, String type, String value) 181 throws IOException { 182 String units = getUnits(name); 183 int slope = getSlope(name); 184 int tmax = getTmax(name); 185 int dmax = getDmax(name); 186 187 offset = 0; 188 xdr_int(0); // metric_user_defined 189 xdr_string(type); 190 xdr_string(name); 191 xdr_string(value); 192 xdr_string(units); 193 xdr_int(slope); 194 xdr_int(tmax); 195 xdr_int(dmax); 196 197 for (SocketAddress socketAddress : metricsServers) { 198 DatagramPacket packet = 199 new DatagramPacket(buffer, offset, socketAddress); 200 datagramSocket.send(packet); 201 } 202 } 203 204 protected String getUnits(String metricName) { 205 String result = unitsTable.get(metricName); 206 if (result == null) { 207 result = DEFAULT_UNITS; 208 } 209 return result; 210 } 211 212 protected int getSlope(String metricName) { 213 String slopeString = slopeTable.get(metricName); 214 if (slopeString == null) { 215 slopeString = DEFAULT_SLOPE; 216 } 217 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c 218 } 219 220 protected int getTmax(String metricName) { 221 if (tmaxTable == null) { 222 return DEFAULT_TMAX; 223 } 224 String tmaxString = tmaxTable.get(metricName); 225 if (tmaxString == null) { 226 return DEFAULT_TMAX; 227 } 228 else { 229 return Integer.parseInt(tmaxString); 230 } 231 } 232 233 protected int getDmax(String metricName) { 234 String dmaxString = dmaxTable.get(metricName); 235 if (dmaxString == null) { 236 return DEFAULT_DMAX; 237 } 238 else { 239 return Integer.parseInt(dmaxString); 240 } 241 } 242 243 /** 244 * Puts a string into the buffer by first writing the size of the string 245 * as an int, followed by the bytes of the string, padded if necessary to 246 * a multiple of 4. 247 */ 248 protected void xdr_string(String s) { 249 byte[] bytes = s.getBytes(Charsets.UTF_8); 250 int len = bytes.length; 251 xdr_int(len); 252 System.arraycopy(bytes, 0, buffer, offset, len); 253 offset += len; 254 pad(); 255 } 256 257 /** 258 * Pads the buffer with zero bytes up to the nearest multiple of 4. 259 */ 260 private void pad() { 261 int newOffset = ((offset + 3) / 4) * 4; 262 while (offset < newOffset) { 263 buffer[offset++] = 0; 264 } 265 } 266 267 /** 268 * Puts an integer into the buffer as 4 bytes, big-endian. 269 */ 270 protected void xdr_int(int i) { 271 buffer[offset++] = (byte)((i >> 24) & 0xff); 272 buffer[offset++] = (byte)((i >> 16) & 0xff); 273 buffer[offset++] = (byte)((i >> 8) & 0xff); 274 buffer[offset++] = (byte)(i & 0xff); 275 } 276}