001/*
002 * GangliaContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.ganglia;
022
023import java.io.IOException;
024import java.net.DatagramPacket;
025import java.net.DatagramSocket;
026import java.net.SocketAddress;
027import java.net.SocketException;
028import java.util.HashMap;
029import java.util.List;
030import java.util.Map;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceStability;
037import org.apache.hadoop.metrics.ContextFactory;
038import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
039import org.apache.hadoop.metrics.spi.OutputRecord;
040import org.apache.hadoop.metrics.spi.Util;
041
042/**
043 * Context for sending metrics to Ganglia.
044 * 
045 */
046@InterfaceAudience.Public
047@InterfaceStability.Evolving
048public class GangliaContext extends AbstractMetricsContext {
049    
050  private static final String PERIOD_PROPERTY = "period";
051  private static final String SERVERS_PROPERTY = "servers";
052  private static final String UNITS_PROPERTY = "units";
053  private static final String SLOPE_PROPERTY = "slope";
054  private static final String TMAX_PROPERTY = "tmax";
055  private static final String DMAX_PROPERTY = "dmax";
056    
057  private static final String DEFAULT_UNITS = "";
058  private static final String DEFAULT_SLOPE = "both";
059  private static final int DEFAULT_TMAX = 60;
060  private static final int DEFAULT_DMAX = 0;
061  private static final int DEFAULT_PORT = 8649;
062  private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
063
064  private final Log LOG = LogFactory.getLog(this.getClass());    
065
066  private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
067    
068  static {
069    typeTable.put(String.class, "string");
070    typeTable.put(Byte.class, "int8");
071    typeTable.put(Short.class, "int16");
072    typeTable.put(Integer.class, "int32");
073    typeTable.put(Long.class, "float");
074    typeTable.put(Float.class, "float");
075  }
076    
077  protected byte[] buffer = new byte[BUFFER_SIZE];
078  protected int offset;
079    
080  protected List<? extends SocketAddress> metricsServers;
081  private Map<String,String> unitsTable;
082  private Map<String,String> slopeTable;
083  private Map<String,String> tmaxTable;
084  private Map<String,String> dmaxTable;
085    
086  protected DatagramSocket datagramSocket;
087    
088  /** Creates a new instance of GangliaContext */
089  @InterfaceAudience.Private
090  public GangliaContext() {
091  }
092    
093  @InterfaceAudience.Private
094  public void init(String contextName, ContextFactory factory) {
095    super.init(contextName, factory);
096    parseAndSetPeriod(PERIOD_PROPERTY);
097        
098    metricsServers = 
099      Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
100        
101    unitsTable = getAttributeTable(UNITS_PROPERTY);
102    slopeTable = getAttributeTable(SLOPE_PROPERTY);
103    tmaxTable  = getAttributeTable(TMAX_PROPERTY);
104    dmaxTable  = getAttributeTable(DMAX_PROPERTY);
105        
106    try {
107      datagramSocket = new DatagramSocket();
108    }
109    catch (SocketException se) {
110      se.printStackTrace();
111    }
112  }
113
114    /**
115   * method to close the datagram socket
116   */
117  @Override
118  public void close() {
119    super.close();
120    if (datagramSocket != null) {
121      datagramSocket.close();
122    }
123  }
124  
125  @InterfaceAudience.Private
126  public void emitRecord(String contextName, String recordName,
127    OutputRecord outRec) 
128  throws IOException {
129    // Setup so that the records have the proper leader names so they are
130    // unambiguous at the ganglia level, and this prevents a lot of rework
131    StringBuilder sb = new StringBuilder();
132    sb.append(contextName);
133    sb.append('.');
134
135    if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
136      sb.append(outRec.getTag("processName"));
137      sb.append('.');
138    }
139
140    sb.append(recordName);
141    sb.append('.');
142    int sbBaseLen = sb.length();
143
144    // emit each metric in turn
145    for (String metricName : outRec.getMetricNames()) {
146      Object metric = outRec.getMetric(metricName);
147      String type = typeTable.get(metric.getClass());
148      if (type != null) {
149        sb.append(metricName);
150        emitMetric(sb.toString(), type, metric.toString());
151        sb.setLength(sbBaseLen);
152      } else {
153        LOG.warn("Unknown metrics type: " + metric.getClass());
154      }
155    }
156  }
157    
158  protected void emitMetric(String name, String type,  String value) 
159  throws IOException {
160    String units = getUnits(name);
161    int slope = getSlope(name);
162    int tmax = getTmax(name);
163    int dmax = getDmax(name);
164        
165    offset = 0;
166    xdr_int(0);             // metric_user_defined
167    xdr_string(type);
168    xdr_string(name);
169    xdr_string(value);
170    xdr_string(units);
171    xdr_int(slope);
172    xdr_int(tmax);
173    xdr_int(dmax);
174        
175    for (SocketAddress socketAddress : metricsServers) {
176      DatagramPacket packet = 
177        new DatagramPacket(buffer, offset, socketAddress);
178      datagramSocket.send(packet);
179    }
180  }
181    
182  protected String getUnits(String metricName) {
183    String result = unitsTable.get(metricName);
184    if (result == null) {
185      result = DEFAULT_UNITS;
186    }
187    return result;
188  }
189    
190  protected int getSlope(String metricName) {
191    String slopeString = slopeTable.get(metricName);
192    if (slopeString == null) {
193      slopeString = DEFAULT_SLOPE; 
194    }
195    return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
196  }
197    
198  protected int getTmax(String metricName) {
199    if (tmaxTable == null) {
200      return DEFAULT_TMAX;
201    }
202    String tmaxString = tmaxTable.get(metricName);
203    if (tmaxString == null) {
204      return DEFAULT_TMAX;
205    }
206    else {
207      return Integer.parseInt(tmaxString);
208    }
209  }
210    
211  protected int getDmax(String metricName) {
212    String dmaxString = dmaxTable.get(metricName);
213    if (dmaxString == null) {
214      return DEFAULT_DMAX;
215    }
216    else {
217      return Integer.parseInt(dmaxString);
218    }
219  }
220    
221  /**
222   * Puts a string into the buffer by first writing the size of the string
223   * as an int, followed by the bytes of the string, padded if necessary to
224   * a multiple of 4.
225   */
226  protected void xdr_string(String s) {
227    byte[] bytes = s.getBytes();
228    int len = bytes.length;
229    xdr_int(len);
230    System.arraycopy(bytes, 0, buffer, offset, len);
231    offset += len;
232    pad();
233  }
234
235  /**
236   * Pads the buffer with zero bytes up to the nearest multiple of 4.
237   */
238  private void pad() {
239    int newOffset = ((offset + 3) / 4) * 4;
240    while (offset < newOffset) {
241      buffer[offset++] = 0;
242    }
243  }
244        
245  /**
246   * Puts an integer into the buffer as 4 bytes, big-endian.
247   */
248  protected void xdr_int(int i) {
249    buffer[offset++] = (byte)((i >> 24) & 0xff);
250    buffer[offset++] = (byte)((i >> 16) & 0xff);
251    buffer[offset++] = (byte)((i >> 8) & 0xff);
252    buffer[offset++] = (byte)(i & 0xff);
253  }
254}