001    /*
002     * GangliaContext.java
003     *
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package org.apache.hadoop.metrics.ganglia;
022    
023    import java.io.IOException;
024    import java.net.DatagramPacket;
025    import java.net.DatagramSocket;
026    import java.net.SocketAddress;
027    import java.net.SocketException;
028    import java.util.HashMap;
029    import java.util.List;
030    import java.util.Map;
031    
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import org.apache.hadoop.classification.InterfaceAudience;
036    import org.apache.hadoop.classification.InterfaceStability;
037    import org.apache.hadoop.metrics.ContextFactory;
038    import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
039    import org.apache.hadoop.metrics.spi.OutputRecord;
040    import org.apache.hadoop.metrics.spi.Util;
041    
042    /**
043     * Context for sending metrics to Ganglia.
044     * 
045     */
046    @InterfaceAudience.Public
047    @InterfaceStability.Evolving
048    public class GangliaContext extends AbstractMetricsContext {
049        
050      private static final String PERIOD_PROPERTY = "period";
051      private static final String SERVERS_PROPERTY = "servers";
052      private static final String UNITS_PROPERTY = "units";
053      private static final String SLOPE_PROPERTY = "slope";
054      private static final String TMAX_PROPERTY = "tmax";
055      private static final String DMAX_PROPERTY = "dmax";
056        
057      private static final String DEFAULT_UNITS = "";
058      private static final String DEFAULT_SLOPE = "both";
059      private static final int DEFAULT_TMAX = 60;
060      private static final int DEFAULT_DMAX = 0;
061      private static final int DEFAULT_PORT = 8649;
062      private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
063    
064      private final Log LOG = LogFactory.getLog(this.getClass());    
065    
066      private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
067        
068      static {
069        typeTable.put(String.class, "string");
070        typeTable.put(Byte.class, "int8");
071        typeTable.put(Short.class, "int16");
072        typeTable.put(Integer.class, "int32");
073        typeTable.put(Long.class, "float");
074        typeTable.put(Float.class, "float");
075      }
076        
077      protected byte[] buffer = new byte[BUFFER_SIZE];
078      protected int offset;
079        
080      protected List<? extends SocketAddress> metricsServers;
081      private Map<String,String> unitsTable;
082      private Map<String,String> slopeTable;
083      private Map<String,String> tmaxTable;
084      private Map<String,String> dmaxTable;
085        
086      protected DatagramSocket datagramSocket;
087        
088      /** Creates a new instance of GangliaContext */
089      @InterfaceAudience.Private
090      public GangliaContext() {
091      }
092        
093      @InterfaceAudience.Private
094      public void init(String contextName, ContextFactory factory) {
095        super.init(contextName, factory);
096        parseAndSetPeriod(PERIOD_PROPERTY);
097            
098        metricsServers = 
099          Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
100            
101        unitsTable = getAttributeTable(UNITS_PROPERTY);
102        slopeTable = getAttributeTable(SLOPE_PROPERTY);
103        tmaxTable  = getAttributeTable(TMAX_PROPERTY);
104        dmaxTable  = getAttributeTable(DMAX_PROPERTY);
105            
106        try {
107          datagramSocket = new DatagramSocket();
108        }
109        catch (SocketException se) {
110          se.printStackTrace();
111        }
112      }
113    
114        /**
115       * method to close the datagram socket
116       */
117      @Override
118      public void close() {
119        super.close();
120        if (datagramSocket != null) {
121          datagramSocket.close();
122        }
123      }
124      
125      @InterfaceAudience.Private
126      public void emitRecord(String contextName, String recordName,
127        OutputRecord outRec) 
128      throws IOException {
129        // Setup so that the records have the proper leader names so they are
130        // unambiguous at the ganglia level, and this prevents a lot of rework
131        StringBuilder sb = new StringBuilder();
132        sb.append(contextName);
133        sb.append('.');
134    
135        if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
136          sb.append(outRec.getTag("processName"));
137          sb.append('.');
138        }
139    
140        sb.append(recordName);
141        sb.append('.');
142        int sbBaseLen = sb.length();
143    
144        // emit each metric in turn
145        for (String metricName : outRec.getMetricNames()) {
146          Object metric = outRec.getMetric(metricName);
147          String type = typeTable.get(metric.getClass());
148          if (type != null) {
149            sb.append(metricName);
150            emitMetric(sb.toString(), type, metric.toString());
151            sb.setLength(sbBaseLen);
152          } else {
153            LOG.warn("Unknown metrics type: " + metric.getClass());
154          }
155        }
156      }
157        
158      protected void emitMetric(String name, String type,  String value) 
159      throws IOException {
160        String units = getUnits(name);
161        int slope = getSlope(name);
162        int tmax = getTmax(name);
163        int dmax = getDmax(name);
164            
165        offset = 0;
166        xdr_int(0);             // metric_user_defined
167        xdr_string(type);
168        xdr_string(name);
169        xdr_string(value);
170        xdr_string(units);
171        xdr_int(slope);
172        xdr_int(tmax);
173        xdr_int(dmax);
174            
175        for (SocketAddress socketAddress : metricsServers) {
176          DatagramPacket packet = 
177            new DatagramPacket(buffer, offset, socketAddress);
178          datagramSocket.send(packet);
179        }
180      }
181        
182      protected String getUnits(String metricName) {
183        String result = unitsTable.get(metricName);
184        if (result == null) {
185          result = DEFAULT_UNITS;
186        }
187        return result;
188      }
189        
190      protected int getSlope(String metricName) {
191        String slopeString = slopeTable.get(metricName);
192        if (slopeString == null) {
193          slopeString = DEFAULT_SLOPE; 
194        }
195        return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
196      }
197        
198      protected int getTmax(String metricName) {
199        if (tmaxTable == null) {
200          return DEFAULT_TMAX;
201        }
202        String tmaxString = tmaxTable.get(metricName);
203        if (tmaxString == null) {
204          return DEFAULT_TMAX;
205        }
206        else {
207          return Integer.parseInt(tmaxString);
208        }
209      }
210        
211      protected int getDmax(String metricName) {
212        String dmaxString = dmaxTable.get(metricName);
213        if (dmaxString == null) {
214          return DEFAULT_DMAX;
215        }
216        else {
217          return Integer.parseInt(dmaxString);
218        }
219      }
220        
221      /**
222       * Puts a string into the buffer by first writing the size of the string
223       * as an int, followed by the bytes of the string, padded if necessary to
224       * a multiple of 4.
225       */
226      protected void xdr_string(String s) {
227        byte[] bytes = s.getBytes();
228        int len = bytes.length;
229        xdr_int(len);
230        System.arraycopy(bytes, 0, buffer, offset, len);
231        offset += len;
232        pad();
233      }
234    
235      /**
236       * Pads the buffer with zero bytes up to the nearest multiple of 4.
237       */
238      private void pad() {
239        int newOffset = ((offset + 3) / 4) * 4;
240        while (offset < newOffset) {
241          buffer[offset++] = 0;
242        }
243      }
244            
245      /**
246       * Puts an integer into the buffer as 4 bytes, big-endian.
247       */
248      protected void xdr_int(int i) {
249        buffer[offset++] = (byte)((i >> 24) & 0xff);
250        buffer[offset++] = (byte)((i >> 16) & 0xff);
251        buffer[offset++] = (byte)((i >> 8) & 0xff);
252        buffer[offset++] = (byte)(i & 0xff);
253      }
254    }