001 /*
002 * GangliaContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021 package org.apache.hadoop.metrics.ganglia;
022
023 import java.io.IOException;
024 import java.net.DatagramPacket;
025 import java.net.DatagramSocket;
026 import java.net.SocketAddress;
027 import java.net.SocketException;
028 import java.util.HashMap;
029 import java.util.List;
030 import java.util.Map;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceStability;
037 import org.apache.hadoop.metrics.ContextFactory;
038 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
039 import org.apache.hadoop.metrics.spi.OutputRecord;
040 import org.apache.hadoop.metrics.spi.Util;
041
042 /**
043 * Context for sending metrics to Ganglia.
044 *
045 */
046 @InterfaceAudience.Public
047 @InterfaceStability.Evolving
048 public class GangliaContext extends AbstractMetricsContext {
049
050 private static final String PERIOD_PROPERTY = "period";
051 private static final String SERVERS_PROPERTY = "servers";
052 private static final String UNITS_PROPERTY = "units";
053 private static final String SLOPE_PROPERTY = "slope";
054 private static final String TMAX_PROPERTY = "tmax";
055 private static final String DMAX_PROPERTY = "dmax";
056
057 private static final String DEFAULT_UNITS = "";
058 private static final String DEFAULT_SLOPE = "both";
059 private static final int DEFAULT_TMAX = 60;
060 private static final int DEFAULT_DMAX = 0;
061 private static final int DEFAULT_PORT = 8649;
062 private static final int BUFFER_SIZE = 1500; // as per libgmond.c
063
064 private final Log LOG = LogFactory.getLog(this.getClass());
065
066 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
067
068 static {
069 typeTable.put(String.class, "string");
070 typeTable.put(Byte.class, "int8");
071 typeTable.put(Short.class, "int16");
072 typeTable.put(Integer.class, "int32");
073 typeTable.put(Long.class, "float");
074 typeTable.put(Float.class, "float");
075 }
076
077 protected byte[] buffer = new byte[BUFFER_SIZE];
078 protected int offset;
079
080 protected List<? extends SocketAddress> metricsServers;
081 private Map<String,String> unitsTable;
082 private Map<String,String> slopeTable;
083 private Map<String,String> tmaxTable;
084 private Map<String,String> dmaxTable;
085
086 protected DatagramSocket datagramSocket;
087
088 /** Creates a new instance of GangliaContext */
089 @InterfaceAudience.Private
090 public GangliaContext() {
091 }
092
093 @InterfaceAudience.Private
094 public void init(String contextName, ContextFactory factory) {
095 super.init(contextName, factory);
096 parseAndSetPeriod(PERIOD_PROPERTY);
097
098 metricsServers =
099 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT);
100
101 unitsTable = getAttributeTable(UNITS_PROPERTY);
102 slopeTable = getAttributeTable(SLOPE_PROPERTY);
103 tmaxTable = getAttributeTable(TMAX_PROPERTY);
104 dmaxTable = getAttributeTable(DMAX_PROPERTY);
105
106 try {
107 datagramSocket = new DatagramSocket();
108 }
109 catch (SocketException se) {
110 se.printStackTrace();
111 }
112 }
113
114 /**
115 * method to close the datagram socket
116 */
117 @Override
118 public void close() {
119 super.close();
120 if (datagramSocket != null) {
121 datagramSocket.close();
122 }
123 }
124
125 @InterfaceAudience.Private
126 public void emitRecord(String contextName, String recordName,
127 OutputRecord outRec)
128 throws IOException {
129 // Setup so that the records have the proper leader names so they are
130 // unambiguous at the ganglia level, and this prevents a lot of rework
131 StringBuilder sb = new StringBuilder();
132 sb.append(contextName);
133 sb.append('.');
134
135 if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
136 sb.append(outRec.getTag("processName"));
137 sb.append('.');
138 }
139
140 sb.append(recordName);
141 sb.append('.');
142 int sbBaseLen = sb.length();
143
144 // emit each metric in turn
145 for (String metricName : outRec.getMetricNames()) {
146 Object metric = outRec.getMetric(metricName);
147 String type = typeTable.get(metric.getClass());
148 if (type != null) {
149 sb.append(metricName);
150 emitMetric(sb.toString(), type, metric.toString());
151 sb.setLength(sbBaseLen);
152 } else {
153 LOG.warn("Unknown metrics type: " + metric.getClass());
154 }
155 }
156 }
157
158 protected void emitMetric(String name, String type, String value)
159 throws IOException {
160 String units = getUnits(name);
161 int slope = getSlope(name);
162 int tmax = getTmax(name);
163 int dmax = getDmax(name);
164
165 offset = 0;
166 xdr_int(0); // metric_user_defined
167 xdr_string(type);
168 xdr_string(name);
169 xdr_string(value);
170 xdr_string(units);
171 xdr_int(slope);
172 xdr_int(tmax);
173 xdr_int(dmax);
174
175 for (SocketAddress socketAddress : metricsServers) {
176 DatagramPacket packet =
177 new DatagramPacket(buffer, offset, socketAddress);
178 datagramSocket.send(packet);
179 }
180 }
181
182 protected String getUnits(String metricName) {
183 String result = unitsTable.get(metricName);
184 if (result == null) {
185 result = DEFAULT_UNITS;
186 }
187 return result;
188 }
189
190 protected int getSlope(String metricName) {
191 String slopeString = slopeTable.get(metricName);
192 if (slopeString == null) {
193 slopeString = DEFAULT_SLOPE;
194 }
195 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
196 }
197
198 protected int getTmax(String metricName) {
199 if (tmaxTable == null) {
200 return DEFAULT_TMAX;
201 }
202 String tmaxString = tmaxTable.get(metricName);
203 if (tmaxString == null) {
204 return DEFAULT_TMAX;
205 }
206 else {
207 return Integer.parseInt(tmaxString);
208 }
209 }
210
211 protected int getDmax(String metricName) {
212 String dmaxString = dmaxTable.get(metricName);
213 if (dmaxString == null) {
214 return DEFAULT_DMAX;
215 }
216 else {
217 return Integer.parseInt(dmaxString);
218 }
219 }
220
221 /**
222 * Puts a string into the buffer by first writing the size of the string
223 * as an int, followed by the bytes of the string, padded if necessary to
224 * a multiple of 4.
225 */
226 protected void xdr_string(String s) {
227 byte[] bytes = s.getBytes();
228 int len = bytes.length;
229 xdr_int(len);
230 System.arraycopy(bytes, 0, buffer, offset, len);
231 offset += len;
232 pad();
233 }
234
235 /**
236 * Pads the buffer with zero bytes up to the nearest multiple of 4.
237 */
238 private void pad() {
239 int newOffset = ((offset + 3) / 4) * 4;
240 while (offset < newOffset) {
241 buffer[offset++] = 0;
242 }
243 }
244
245 /**
246 * Puts an integer into the buffer as 4 bytes, big-endian.
247 */
248 protected void xdr_int(int i) {
249 buffer[offset++] = (byte)((i >> 24) & 0xff);
250 buffer[offset++] = (byte)((i >> 16) & 0xff);
251 buffer[offset++] = (byte)((i >> 8) & 0xff);
252 buffer[offset++] = (byte)(i & 0xff);
253 }
254 }