001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink.ganglia; 020 021import java.io.IOException; 022import java.net.*; 023import java.nio.charset.StandardCharsets; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.commons.configuration.SubsetConfiguration; 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.metrics2.MetricsSink; 032import org.apache.hadoop.metrics2.util.Servers; 033import org.apache.hadoop.net.DNS; 034 035/** 036 * This the base class for Ganglia sink classes using metrics2. Lot of the code 037 * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext. 038 * As per the documentation, sink implementations doesn't have to worry about 039 * thread safety. Hence the code wasn't written for thread safety and should 040 * be modified in case the above assumption changes in the future. 041 */ 042public abstract class AbstractGangliaSink implements MetricsSink { 043 044 public final Log LOG = LogFactory.getLog(this.getClass()); 045 046 /* 047 * Output of "gmetric --help" showing allowable values 048 * -t, --type=STRING 049 * Either string|int8|uint8|int16|uint16|int32|uint32|float|double 050 * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius 051 * (default='') 052 * -s, --slope=STRING Either zero|positive|negative|both 053 * (default='both') 054 * -x, --tmax=INT The maximum time in seconds between gmetric calls 055 * (default='60') 056 */ 057 public static final String DEFAULT_UNITS = ""; 058 public static final int DEFAULT_TMAX = 60; 059 public static final int DEFAULT_DMAX = 0; 060 public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both; 061 public static final int DEFAULT_PORT = 8649; 062 public static final boolean DEFAULT_MULTICAST_ENABLED = false; 063 public static final int DEFAULT_MULTICAST_TTL = 1; 064 public static final String SERVERS_PROPERTY = "servers"; 065 public static final String MULTICAST_ENABLED_PROPERTY = "multicast"; 066 public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl"; 067 public static final int BUFFER_SIZE = 1500; // as per libgmond.c 068 public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse"; 069 public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false; 070 public static final String EQUAL = "="; 071 072 private String hostName = "UNKNOWN.example.com"; 073 private DatagramSocket datagramSocket; 074 private List<? extends SocketAddress> metricsServers; 075 private boolean multicastEnabled; 076 private int multicastTtl; 077 private byte[] buffer = new byte[BUFFER_SIZE]; 078 private int offset; 079 private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT; 080 081 /** 082 * Used for visiting Metrics 083 */ 084 protected final GangliaMetricVisitor gangliaMetricVisitor = 085 new GangliaMetricVisitor(); 086 087 private SubsetConfiguration conf; 088 private Map<String, GangliaConf> gangliaConfMap; 089 private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf(); 090 091 /** 092 * ganglia slope values which equal the ordinal 093 */ 094 public enum GangliaSlope { 095 zero, // 0 096 positive, // 1 097 negative, // 2 098 both // 3 099 }; 100 101 /** 102 * define enum for various type of conf 103 */ 104 public enum GangliaConfType { 105 slope, units, dmax, tmax 106 }; 107 108 /* 109 * (non-Javadoc) 110 * 111 * @see 112 * org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration 113 * .SubsetConfiguration) 114 */ 115 public void init(SubsetConfiguration conf) { 116 LOG.debug("Initializing the GangliaSink for Ganglia metrics."); 117 118 this.conf = conf; 119 120 // Take the hostname from the DNS class. 121 if (conf.getString("slave.host.name") != null) { 122 hostName = conf.getString("slave.host.name"); 123 } else { 124 try { 125 hostName = DNS.getDefaultHost( 126 conf.getString("dfs.datanode.dns.interface", "default"), 127 conf.getString("dfs.datanode.dns.nameserver", "default")); 128 } catch (UnknownHostException uhe) { 129 LOG.error(uhe); 130 hostName = "UNKNOWN.example.com"; 131 } 132 } 133 134 // load the gannglia servers from properties 135 metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY), 136 DEFAULT_PORT); 137 multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY, 138 DEFAULT_MULTICAST_ENABLED); 139 multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL); 140 141 // extract the Ganglia conf per metrics 142 gangliaConfMap = new HashMap<String, GangliaConf>(); 143 loadGangliaConf(GangliaConfType.units); 144 loadGangliaConf(GangliaConfType.tmax); 145 loadGangliaConf(GangliaConfType.dmax); 146 loadGangliaConf(GangliaConfType.slope); 147 148 try { 149 if (multicastEnabled) { 150 LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl); 151 datagramSocket = new MulticastSocket(); 152 ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl); 153 } else { 154 datagramSocket = new DatagramSocket(); 155 } 156 } catch (IOException e) { 157 LOG.error(e); 158 } 159 160 // see if sparseMetrics is supported. Default is false 161 supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY, 162 SUPPORT_SPARSE_METRICS_DEFAULT); 163 } 164 165 /* 166 * (non-Javadoc) 167 * 168 * @see org.apache.hadoop.metrics2.MetricsSink#flush() 169 */ 170 public void flush() { 171 // nothing to do as we are not buffering data 172 } 173 174 // Load the configurations for a conf type 175 private void loadGangliaConf(GangliaConfType gtype) { 176 String propertyarr[] = conf.getStringArray(gtype.name()); 177 if (propertyarr != null && propertyarr.length > 0) { 178 for (String metricNValue : propertyarr) { 179 String metricNValueArr[] = metricNValue.split(EQUAL); 180 if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) { 181 LOG.error("Invalid propertylist for " + gtype.name()); 182 } 183 184 String metricName = metricNValueArr[0].trim(); 185 String metricValue = metricNValueArr[1].trim(); 186 GangliaConf gconf = gangliaConfMap.get(metricName); 187 if (gconf == null) { 188 gconf = new GangliaConf(); 189 gangliaConfMap.put(metricName, gconf); 190 } 191 192 switch (gtype) { 193 case units: 194 gconf.setUnits(metricValue); 195 break; 196 case dmax: 197 gconf.setDmax(Integer.parseInt(metricValue)); 198 break; 199 case tmax: 200 gconf.setTmax(Integer.parseInt(metricValue)); 201 break; 202 case slope: 203 gconf.setSlope(GangliaSlope.valueOf(metricValue)); 204 break; 205 } 206 } 207 } 208 } 209 210 /** 211 * Lookup GangliaConf from cache. If not found, return default values 212 * 213 * @param metricName 214 * @return looked up GangliaConf 215 */ 216 protected GangliaConf getGangliaConfForMetric(String metricName) { 217 GangliaConf gconf = gangliaConfMap.get(metricName); 218 219 return gconf != null ? gconf : DEFAULT_GANGLIA_CONF; 220 } 221 222 /** 223 * @return the hostName 224 */ 225 protected String getHostName() { 226 return hostName; 227 } 228 229 /** 230 * Puts a string into the buffer by first writing the size of the string as an 231 * int, followed by the bytes of the string, padded if necessary to a multiple 232 * of 4. 233 * @param s the string to be written to buffer at offset location 234 */ 235 protected void xdr_string(String s) { 236 byte[] bytes = s.getBytes(StandardCharsets.UTF_8); 237 int len = bytes.length; 238 xdr_int(len); 239 System.arraycopy(bytes, 0, buffer, offset, len); 240 offset += len; 241 pad(); 242 } 243 244 // Pads the buffer with zero bytes up to the nearest multiple of 4. 245 private void pad() { 246 int newOffset = ((offset + 3) / 4) * 4; 247 while (offset < newOffset) { 248 buffer[offset++] = 0; 249 } 250 } 251 252 /** 253 * Puts an integer into the buffer as 4 bytes, big-endian. 254 */ 255 protected void xdr_int(int i) { 256 buffer[offset++] = (byte) ((i >> 24) & 0xff); 257 buffer[offset++] = (byte) ((i >> 16) & 0xff); 258 buffer[offset++] = (byte) ((i >> 8) & 0xff); 259 buffer[offset++] = (byte) (i & 0xff); 260 } 261 262 /** 263 * Sends Ganglia Metrics to the configured hosts 264 * @throws IOException 265 */ 266 protected void emitToGangliaHosts() throws IOException { 267 try { 268 for (SocketAddress socketAddress : metricsServers) { 269 if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) 270 throw new IllegalArgumentException("Unsupported Address type"); 271 InetSocketAddress inetAddress = (InetSocketAddress)socketAddress; 272 if(inetAddress.isUnresolved()) { 273 throw new UnknownHostException("Unresolved host: " + inetAddress); 274 } 275 DatagramPacket packet = 276 new DatagramPacket(buffer, offset, socketAddress); 277 datagramSocket.send(packet); 278 } 279 } finally { 280 // reset the buffer for the next metric to be built 281 offset = 0; 282 } 283 } 284 285 /** 286 * Reset the buffer for the next metric to be built 287 */ 288 void resetBuffer() { 289 offset = 0; 290 } 291 292 /** 293 * @return whether sparse metrics are supported 294 */ 295 protected boolean isSupportSparseMetrics() { 296 return supportSparseMetrics; 297 } 298 299 /** 300 * Used only by unit test 301 * @param datagramSocket the datagramSocket to set. 302 */ 303 void setDatagramSocket(DatagramSocket datagramSocket) { 304 this.datagramSocket = datagramSocket; 305 } 306 307 /** 308 * Used only by unit tests 309 * @return the datagramSocket for this sink 310 */ 311 DatagramSocket getDatagramSocket() { 312 return datagramSocket; 313 } 314}