001/*
002 * GangliaContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.ganglia;
022
023import java.io.IOException;
024import java.net.*;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.commons.io.Charsets;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.metrics.ContextFactory;
036import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
037import org.apache.hadoop.metrics.spi.OutputRecord;
038import org.apache.hadoop.metrics.spi.Util;
039
040/**
041 * Context for sending metrics to Ganglia.
042 *
043 * @deprecated Use {@link org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30}
044 * instead.
045 */
046@Deprecated
047@InterfaceAudience.Public
048@InterfaceStability.Evolving
049public class GangliaContext extends AbstractMetricsContext {
050    
051  private static final String PERIOD_PROPERTY = "period";
052  private static final String SERVERS_PROPERTY = "servers";
053  private static final String UNITS_PROPERTY = "units";
054  private static final String SLOPE_PROPERTY = "slope";
055  private static final String TMAX_PROPERTY = "tmax";
056  private static final String DMAX_PROPERTY = "dmax";
057  private static final String MULTICAST_PROPERTY = "multicast";
058  private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
059
060  private static final String DEFAULT_UNITS = "";
061  private static final String DEFAULT_SLOPE = "both";
062  private static final int DEFAULT_TMAX = 60;
063  private static final int DEFAULT_DMAX = 0;
064  private static final int DEFAULT_PORT = 8649;
065  private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
066  private static final int DEFAULT_MULTICAST_TTL = 1;
067
068  private final Log LOG = LogFactory.getLog(this.getClass());    
069
070  private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
071    
072  static {
073    typeTable.put(String.class, "string");
074    typeTable.put(Byte.class, "int8");
075    typeTable.put(Short.class, "int16");
076    typeTable.put(Integer.class, "int32");
077    typeTable.put(Long.class, "float");
078    typeTable.put(Float.class, "float");
079  }
080    
081  protected byte[] buffer = new byte[BUFFER_SIZE];
082  protected int offset;
083    
084  protected List<? extends SocketAddress> metricsServers;
085  private Map<String,String> unitsTable;
086  private Map<String,String> slopeTable;
087  private Map<String,String> tmaxTable;
088  private Map<String,String> dmaxTable;
089  private boolean multicastEnabled;
090  private int multicastTtl;
091    
092  protected DatagramSocket datagramSocket;
093    
094  /** Creates a new instance of GangliaContext */
095  @InterfaceAudience.Private
096  public GangliaContext() {
097  }
098    
099  @Override
100  @InterfaceAudience.Private
101  public void init(String contextName, ContextFactory factory) {
102    super.init(contextName, factory);
103    parseAndSetPeriod(PERIOD_PROPERTY);
104        
105    metricsServers = 
106      Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
107        
108    unitsTable = getAttributeTable(UNITS_PROPERTY);
109    slopeTable = getAttributeTable(SLOPE_PROPERTY);
110    tmaxTable  = getAttributeTable(TMAX_PROPERTY);
111    dmaxTable  = getAttributeTable(DMAX_PROPERTY);
112    multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY));
113    String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY);
114    if (multicastEnabled) {
115      if (multicastTtlValue == null) {
116        multicastTtl = DEFAULT_MULTICAST_TTL;
117      } else {
118        multicastTtl = Integer.parseInt(multicastTtlValue);
119      }
120    }
121        
122    try {
123      if (multicastEnabled) {
124        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
125        datagramSocket = new MulticastSocket();
126        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
127      } else {
128        datagramSocket = new DatagramSocket();
129      }
130    } catch (IOException e) {
131      LOG.error(e);
132    }
133  }
134
135    /**
136   * method to close the datagram socket
137   */
138  @Override
139  public void close() {
140    super.close();
141    if (datagramSocket != null) {
142      datagramSocket.close();
143    }
144  }
145  
146  @Override
147  @InterfaceAudience.Private
148  public void emitRecord(String contextName, String recordName,
149    OutputRecord outRec) 
150  throws IOException {
151    // Setup so that the records have the proper leader names so they are
152    // unambiguous at the ganglia level, and this prevents a lot of rework
153    StringBuilder sb = new StringBuilder();
154    sb.append(contextName);
155    sb.append('.');
156
157    if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
158      sb.append(outRec.getTag("processName"));
159      sb.append('.');
160    }
161
162    sb.append(recordName);
163    sb.append('.');
164    int sbBaseLen = sb.length();
165
166    // emit each metric in turn
167    for (String metricName : outRec.getMetricNames()) {
168      Object metric = outRec.getMetric(metricName);
169      String type = typeTable.get(metric.getClass());
170      if (type != null) {
171        sb.append(metricName);
172        emitMetric(sb.toString(), type, metric.toString());
173        sb.setLength(sbBaseLen);
174      } else {
175        LOG.warn("Unknown metrics type: " + metric.getClass());
176      }
177    }
178  }
179    
180  protected void emitMetric(String name, String type,  String value) 
181  throws IOException {
182    String units = getUnits(name);
183    int slope = getSlope(name);
184    int tmax = getTmax(name);
185    int dmax = getDmax(name);
186        
187    offset = 0;
188    xdr_int(0);             // metric_user_defined
189    xdr_string(type);
190    xdr_string(name);
191    xdr_string(value);
192    xdr_string(units);
193    xdr_int(slope);
194    xdr_int(tmax);
195    xdr_int(dmax);
196        
197    for (SocketAddress socketAddress : metricsServers) {
198      DatagramPacket packet = 
199        new DatagramPacket(buffer, offset, socketAddress);
200      datagramSocket.send(packet);
201    }
202  }
203    
204  protected String getUnits(String metricName) {
205    String result = unitsTable.get(metricName);
206    if (result == null) {
207      result = DEFAULT_UNITS;
208    }
209    return result;
210  }
211    
212  protected int getSlope(String metricName) {
213    String slopeString = slopeTable.get(metricName);
214    if (slopeString == null) {
215      slopeString = DEFAULT_SLOPE; 
216    }
217    return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
218  }
219    
220  protected int getTmax(String metricName) {
221    if (tmaxTable == null) {
222      return DEFAULT_TMAX;
223    }
224    String tmaxString = tmaxTable.get(metricName);
225    if (tmaxString == null) {
226      return DEFAULT_TMAX;
227    }
228    else {
229      return Integer.parseInt(tmaxString);
230    }
231  }
232    
233  protected int getDmax(String metricName) {
234    String dmaxString = dmaxTable.get(metricName);
235    if (dmaxString == null) {
236      return DEFAULT_DMAX;
237    }
238    else {
239      return Integer.parseInt(dmaxString);
240    }
241  }
242    
243  /**
244   * Puts a string into the buffer by first writing the size of the string
245   * as an int, followed by the bytes of the string, padded if necessary to
246   * a multiple of 4.
247   */
248  protected void xdr_string(String s) {
249    byte[] bytes = s.getBytes(Charsets.UTF_8);
250    int len = bytes.length;
251    xdr_int(len);
252    System.arraycopy(bytes, 0, buffer, offset, len);
253    offset += len;
254    pad();
255  }
256
257  /**
258   * Pads the buffer with zero bytes up to the nearest multiple of 4.
259   */
260  private void pad() {
261    int newOffset = ((offset + 3) / 4) * 4;
262    while (offset < newOffset) {
263      buffer[offset++] = 0;
264    }
265  }
266        
267  /**
268   * Puts an integer into the buffer as 4 bytes, big-endian.
269   */
270  protected void xdr_int(int i) {
271    buffer[offset++] = (byte)((i >> 24) & 0xff);
272    buffer[offset++] = (byte)((i >> 16) & 0xff);
273    buffer[offset++] = (byte)((i >> 8) & 0xff);
274    buffer[offset++] = (byte)(i & 0xff);
275  }
276}