001/*
002 * GangliaContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.ganglia;
022
023import java.io.IOException;
024import java.net.DatagramPacket;
025import java.net.DatagramSocket;
026import java.net.SocketAddress;
027import java.net.SocketException;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.metrics.ContextFactory;
038import org.apache.hadoop.metrics.MetricsException;
039import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
040import org.apache.hadoop.metrics.spi.OutputRecord;
041import org.apache.hadoop.metrics.spi.Util;
042
043/**
044 * Context for sending metrics to Ganglia.
045 * 
046 */
047@InterfaceAudience.Public
048@InterfaceStability.Evolving
049public class GangliaContext extends AbstractMetricsContext {
050    
051  private static final String PERIOD_PROPERTY = "period";
052  private static final String SERVERS_PROPERTY = "servers";
053  private static final String UNITS_PROPERTY = "units";
054  private static final String SLOPE_PROPERTY = "slope";
055  private static final String TMAX_PROPERTY = "tmax";
056  private static final String DMAX_PROPERTY = "dmax";
057    
058  private static final String DEFAULT_UNITS = "";
059  private static final String DEFAULT_SLOPE = "both";
060  private static final int DEFAULT_TMAX = 60;
061  private static final int DEFAULT_DMAX = 0;
062  private static final int DEFAULT_PORT = 8649;
063  private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
064
065  private final Log LOG = LogFactory.getLog(this.getClass());    
066
067  private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
068    
069  static {
070    typeTable.put(String.class, "string");
071    typeTable.put(Byte.class, "int8");
072    typeTable.put(Short.class, "int16");
073    typeTable.put(Integer.class, "int32");
074    typeTable.put(Long.class, "float");
075    typeTable.put(Float.class, "float");
076  }
077    
078  protected byte[] buffer = new byte[BUFFER_SIZE];
079  protected int offset;
080    
081  protected List<? extends SocketAddress> metricsServers;
082  private Map<String,String> unitsTable;
083  private Map<String,String> slopeTable;
084  private Map<String,String> tmaxTable;
085  private Map<String,String> dmaxTable;
086    
087  protected DatagramSocket datagramSocket;
088    
089  /** Creates a new instance of GangliaContext */
090  @InterfaceAudience.Private
091  public GangliaContext() {
092  }
093    
094  @InterfaceAudience.Private
095  public void init(String contextName, ContextFactory factory) {
096    super.init(contextName, factory);
097    parseAndSetPeriod(PERIOD_PROPERTY);
098        
099    metricsServers = 
100      Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
101        
102    unitsTable = getAttributeTable(UNITS_PROPERTY);
103    slopeTable = getAttributeTable(SLOPE_PROPERTY);
104    tmaxTable  = getAttributeTable(TMAX_PROPERTY);
105    dmaxTable  = getAttributeTable(DMAX_PROPERTY);
106        
107    try {
108      datagramSocket = new DatagramSocket();
109    }
110    catch (SocketException se) {
111      se.printStackTrace();
112    }
113  }
114
115    /**
116   * method to close the datagram socket
117   */
118  @Override
119  public void close() {
120    super.close();
121    if (datagramSocket != null) {
122      datagramSocket.close();
123    }
124  }
125  
126  @InterfaceAudience.Private
127  public void emitRecord(String contextName, String recordName,
128    OutputRecord outRec) 
129  throws IOException {
130    // Setup so that the records have the proper leader names so they are
131    // unambiguous at the ganglia level, and this prevents a lot of rework
132    StringBuilder sb = new StringBuilder();
133    sb.append(contextName);
134    sb.append('.');
135
136    if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
137      sb.append(outRec.getTag("processName"));
138      sb.append('.');
139    }
140
141    sb.append(recordName);
142    sb.append('.');
143    int sbBaseLen = sb.length();
144
145    // emit each metric in turn
146    for (String metricName : outRec.getMetricNames()) {
147      Object metric = outRec.getMetric(metricName);
148      String type = typeTable.get(metric.getClass());
149      if (type != null) {
150        sb.append(metricName);
151        emitMetric(sb.toString(), type, metric.toString());
152        sb.setLength(sbBaseLen);
153      } else {
154        LOG.warn("Unknown metrics type: " + metric.getClass());
155      }
156    }
157  }
158    
159  protected void emitMetric(String name, String type,  String value) 
160  throws IOException {
161    String units = getUnits(name);
162    int slope = getSlope(name);
163    int tmax = getTmax(name);
164    int dmax = getDmax(name);
165        
166    offset = 0;
167    xdr_int(0);             // metric_user_defined
168    xdr_string(type);
169    xdr_string(name);
170    xdr_string(value);
171    xdr_string(units);
172    xdr_int(slope);
173    xdr_int(tmax);
174    xdr_int(dmax);
175        
176    for (SocketAddress socketAddress : metricsServers) {
177      DatagramPacket packet = 
178        new DatagramPacket(buffer, offset, socketAddress);
179      datagramSocket.send(packet);
180    }
181  }
182    
183  protected String getUnits(String metricName) {
184    String result = unitsTable.get(metricName);
185    if (result == null) {
186      result = DEFAULT_UNITS;
187    }
188    return result;
189  }
190    
191  protected int getSlope(String metricName) {
192    String slopeString = slopeTable.get(metricName);
193    if (slopeString == null) {
194      slopeString = DEFAULT_SLOPE; 
195    }
196    return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
197  }
198    
199  protected int getTmax(String metricName) {
200    if (tmaxTable == null) {
201      return DEFAULT_TMAX;
202    }
203    String tmaxString = tmaxTable.get(metricName);
204    if (tmaxString == null) {
205      return DEFAULT_TMAX;
206    }
207    else {
208      return Integer.parseInt(tmaxString);
209    }
210  }
211    
212  protected int getDmax(String metricName) {
213    String dmaxString = dmaxTable.get(metricName);
214    if (dmaxString == null) {
215      return DEFAULT_DMAX;
216    }
217    else {
218      return Integer.parseInt(dmaxString);
219    }
220  }
221    
222  /**
223   * Puts a string into the buffer by first writing the size of the string
224   * as an int, followed by the bytes of the string, padded if necessary to
225   * a multiple of 4.
226   */
227  protected void xdr_string(String s) {
228    byte[] bytes = s.getBytes();
229    int len = bytes.length;
230    xdr_int(len);
231    System.arraycopy(bytes, 0, buffer, offset, len);
232    offset += len;
233    pad();
234  }
235
236  /**
237   * Pads the buffer with zero bytes up to the nearest multiple of 4.
238   */
239  private void pad() {
240    int newOffset = ((offset + 3) / 4) * 4;
241    while (offset < newOffset) {
242      buffer[offset++] = 0;
243    }
244  }
245        
246  /**
247   * Puts an integer into the buffer as 4 bytes, big-endian.
248   */
249  protected void xdr_int(int i) {
250    buffer[offset++] = (byte)((i >> 24) & 0xff);
251    buffer[offset++] = (byte)((i >> 16) & 0xff);
252    buffer[offset++] = (byte)((i >> 8) & 0xff);
253    buffer[offset++] = (byte)(i & 0xff);
254  }
255}