001 /* 002 * GangliaContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package org.apache.hadoop.metrics.ganglia; 022 023 import java.io.IOException; 024 import java.net.DatagramPacket; 025 import java.net.DatagramSocket; 026 import java.net.SocketAddress; 027 import java.net.SocketException; 028 import java.util.HashMap; 029 import java.util.List; 030 import java.util.Map; 031 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 import org.apache.hadoop.classification.InterfaceAudience; 036 import org.apache.hadoop.classification.InterfaceStability; 037 import org.apache.hadoop.metrics.ContextFactory; 038 import org.apache.hadoop.metrics.spi.AbstractMetricsContext; 039 import org.apache.hadoop.metrics.spi.OutputRecord; 040 import org.apache.hadoop.metrics.spi.Util; 041 042 /** 043 * Context for sending metrics to Ganglia. 044 * 045 */ 046 @InterfaceAudience.Public 047 @InterfaceStability.Evolving 048 public class GangliaContext extends AbstractMetricsContext { 049 050 private static final String PERIOD_PROPERTY = "period"; 051 private static final String SERVERS_PROPERTY = "servers"; 052 private static final String UNITS_PROPERTY = "units"; 053 private static final String SLOPE_PROPERTY = "slope"; 054 private static final String TMAX_PROPERTY = "tmax"; 055 private static final String DMAX_PROPERTY = "dmax"; 056 057 private static final String DEFAULT_UNITS = ""; 058 private static final String DEFAULT_SLOPE = "both"; 059 private static final int DEFAULT_TMAX = 60; 060 private static final int DEFAULT_DMAX = 0; 061 private static final int DEFAULT_PORT = 8649; 062 private static final int BUFFER_SIZE = 1500; // as per libgmond.c 063 064 private final Log LOG = LogFactory.getLog(this.getClass()); 065 066 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5); 067 068 static { 069 typeTable.put(String.class, "string"); 070 typeTable.put(Byte.class, "int8"); 071 typeTable.put(Short.class, "int16"); 072 typeTable.put(Integer.class, "int32"); 073 typeTable.put(Long.class, "float"); 074 typeTable.put(Float.class, "float"); 075 } 076 077 protected byte[] buffer = new byte[BUFFER_SIZE]; 078 protected int offset; 079 080 protected List<? extends SocketAddress> metricsServers; 081 private Map<String,String> unitsTable; 082 private Map<String,String> slopeTable; 083 private Map<String,String> tmaxTable; 084 private Map<String,String> dmaxTable; 085 086 protected DatagramSocket datagramSocket; 087 088 /** Creates a new instance of GangliaContext */ 089 @InterfaceAudience.Private 090 public GangliaContext() { 091 } 092 093 @InterfaceAudience.Private 094 public void init(String contextName, ContextFactory factory) { 095 super.init(contextName, factory); 096 parseAndSetPeriod(PERIOD_PROPERTY); 097 098 metricsServers = 099 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 100 101 unitsTable = getAttributeTable(UNITS_PROPERTY); 102 slopeTable = getAttributeTable(SLOPE_PROPERTY); 103 tmaxTable = getAttributeTable(TMAX_PROPERTY); 104 dmaxTable = getAttributeTable(DMAX_PROPERTY); 105 106 try { 107 datagramSocket = new DatagramSocket(); 108 } 109 catch (SocketException se) { 110 se.printStackTrace(); 111 } 112 } 113 114 /** 115 * method to close the datagram socket 116 */ 117 @Override 118 public void close() { 119 super.close(); 120 if (datagramSocket != null) { 121 datagramSocket.close(); 122 } 123 } 124 125 @InterfaceAudience.Private 126 public void emitRecord(String contextName, String recordName, 127 OutputRecord outRec) 128 throws IOException { 129 // Setup so that the records have the proper leader names so they are 130 // unambiguous at the ganglia level, and this prevents a lot of rework 131 StringBuilder sb = new StringBuilder(); 132 sb.append(contextName); 133 sb.append('.'); 134 135 if (contextName.equals("jvm") && outRec.getTag("processName") != null) { 136 sb.append(outRec.getTag("processName")); 137 sb.append('.'); 138 } 139 140 sb.append(recordName); 141 sb.append('.'); 142 int sbBaseLen = sb.length(); 143 144 // emit each metric in turn 145 for (String metricName : outRec.getMetricNames()) { 146 Object metric = outRec.getMetric(metricName); 147 String type = typeTable.get(metric.getClass()); 148 if (type != null) { 149 sb.append(metricName); 150 emitMetric(sb.toString(), type, metric.toString()); 151 sb.setLength(sbBaseLen); 152 } else { 153 LOG.warn("Unknown metrics type: " + metric.getClass()); 154 } 155 } 156 } 157 158 protected void emitMetric(String name, String type, String value) 159 throws IOException { 160 String units = getUnits(name); 161 int slope = getSlope(name); 162 int tmax = getTmax(name); 163 int dmax = getDmax(name); 164 165 offset = 0; 166 xdr_int(0); // metric_user_defined 167 xdr_string(type); 168 xdr_string(name); 169 xdr_string(value); 170 xdr_string(units); 171 xdr_int(slope); 172 xdr_int(tmax); 173 xdr_int(dmax); 174 175 for (SocketAddress socketAddress : metricsServers) { 176 DatagramPacket packet = 177 new DatagramPacket(buffer, offset, socketAddress); 178 datagramSocket.send(packet); 179 } 180 } 181 182 protected String getUnits(String metricName) { 183 String result = unitsTable.get(metricName); 184 if (result == null) { 185 result = DEFAULT_UNITS; 186 } 187 return result; 188 } 189 190 protected int getSlope(String metricName) { 191 String slopeString = slopeTable.get(metricName); 192 if (slopeString == null) { 193 slopeString = DEFAULT_SLOPE; 194 } 195 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c 196 } 197 198 protected int getTmax(String metricName) { 199 if (tmaxTable == null) { 200 return DEFAULT_TMAX; 201 } 202 String tmaxString = tmaxTable.get(metricName); 203 if (tmaxString == null) { 204 return DEFAULT_TMAX; 205 } 206 else { 207 return Integer.parseInt(tmaxString); 208 } 209 } 210 211 protected int getDmax(String metricName) { 212 String dmaxString = dmaxTable.get(metricName); 213 if (dmaxString == null) { 214 return DEFAULT_DMAX; 215 } 216 else { 217 return Integer.parseInt(dmaxString); 218 } 219 } 220 221 /** 222 * Puts a string into the buffer by first writing the size of the string 223 * as an int, followed by the bytes of the string, padded if necessary to 224 * a multiple of 4. 225 */ 226 protected void xdr_string(String s) { 227 byte[] bytes = s.getBytes(); 228 int len = bytes.length; 229 xdr_int(len); 230 System.arraycopy(bytes, 0, buffer, offset, len); 231 offset += len; 232 pad(); 233 } 234 235 /** 236 * Pads the buffer with zero bytes up to the nearest multiple of 4. 237 */ 238 private void pad() { 239 int newOffset = ((offset + 3) / 4) * 4; 240 while (offset < newOffset) { 241 buffer[offset++] = 0; 242 } 243 } 244 245 /** 246 * Puts an integer into the buffer as 4 bytes, big-endian. 247 */ 248 protected void xdr_int(int i) { 249 buffer[offset++] = (byte)((i >> 24) & 0xff); 250 buffer[offset++] = (byte)((i >> 16) & 0xff); 251 buffer[offset++] = (byte)((i >> 8) & 0xff); 252 buffer[offset++] = (byte)(i & 0xff); 253 } 254 }