001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.sink.ganglia;
020
021import java.io.IOException;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028
029import org.apache.commons.configuration.SubsetConfiguration;
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.metrics2.AbstractMetric;
034import org.apache.hadoop.metrics2.MetricsException;
035import org.apache.hadoop.metrics2.MetricsRecord;
036import org.apache.hadoop.metrics2.MetricsTag;
037import org.apache.hadoop.metrics2.impl.MsInfo;
038import org.apache.hadoop.metrics2.util.MetricsCache;
039import org.apache.hadoop.metrics2.util.MetricsCache.Record;
040
041/**
042 * This code supports Ganglia 3.0
043 * 
044 */
045public class GangliaSink30 extends AbstractGangliaSink {
046
047  public final Log LOG = LogFactory.getLog(this.getClass());
048
049  private static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
050  
051  private MetricsCache metricsCache = new MetricsCache();
052
053  // a key with a NULL value means ALL
054  private Map<String,Set<String>> useTagsMap = new HashMap<String,Set<String>>();
055
056  @Override
057  @SuppressWarnings("unchecked")
058  public void init(SubsetConfiguration conf) {
059    super.init(conf);
060
061    conf.setListDelimiter(',');
062    Iterator<String> it = (Iterator<String>) conf.getKeys();
063    while (it.hasNext()) {
064      String propertyName = it.next();
065      if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
066        String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
067        String[] tags = conf.getStringArray(propertyName);
068        boolean useAllTags = false;
069        Set<String> set = null;
070        if (tags.length > 0) {
071          set = new HashSet<String>();
072          for (String tag : tags) {
073            tag = tag.trim();
074            useAllTags |= tag.equals("*");
075            if (tag.length() > 0) {
076              set.add(tag);
077            }
078          }
079          if (useAllTags) {
080            set = null;
081          }
082        }
083        useTagsMap.put(contextName, set);
084      }
085    }
086  }
087
088  @InterfaceAudience.Private
089  public void appendPrefix(MetricsRecord record, StringBuilder sb) {
090    String contextName = record.context();
091    Collection<MetricsTag> tags = record.tags();
092    if (useTagsMap.containsKey(contextName)) {
093      Set<String> useTags = useTagsMap.get(contextName);
094      for (MetricsTag t : tags) {
095        if (useTags == null || useTags.contains(t.name())) {
096
097          // the context is always skipped here because it is always added
098          
099          // the hostname is always skipped to avoid case-mismatches 
100          // from different DNSes.
101
102          if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) {
103            sb.append('.').append(t.name()).append('=').append(t.value());
104          }
105        }
106      }
107    }          
108  }
109  
110  @Override
111  public void putMetrics(MetricsRecord record) {
112    // The method handles both cases whether Ganglia support dense publish
113    // of metrics of sparse (only on change) publish of metrics
114    try {
115      String recordName = record.name();
116      String contextName = record.context();
117
118      StringBuilder sb = new StringBuilder();
119      sb.append(contextName);
120      sb.append('.');
121      sb.append(recordName);
122
123      appendPrefix(record, sb);
124      
125      String groupName = sb.toString();
126      sb.append('.');
127      int sbBaseLen = sb.length();
128
129      String type = null;
130      GangliaSlope slopeFromMetric = null;
131      GangliaSlope calculatedSlope = null;
132      Record cachedMetrics = null;
133      resetBuffer();  // reset the buffer to the beginning
134      if (!isSupportSparseMetrics()) {
135        // for sending dense metrics, update metrics cache
136        // and get the updated data
137        cachedMetrics = metricsCache.update(record);
138
139        if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) {
140          for (Map.Entry<String, AbstractMetric> entry : cachedMetrics
141              .metricsEntrySet()) {
142            AbstractMetric metric = entry.getValue();
143            sb.append(metric.name());
144            String name = sb.toString();
145
146            // visit the metric to identify the Ganglia type and
147            // slope
148            metric.visit(gangliaMetricVisitor);
149            type = gangliaMetricVisitor.getType();
150            slopeFromMetric = gangliaMetricVisitor.getSlope();
151
152            GangliaConf gConf = getGangliaConfForMetric(name);
153            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
154
155            // send metric to Ganglia
156            emitMetric(groupName, name, type, metric.value().toString(), gConf,
157                calculatedSlope);
158
159            // reset the length of the buffer for next iteration
160            sb.setLength(sbBaseLen);
161          }
162        }
163      } else {
164        // we support sparse updates
165
166        Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record
167            .metrics();
168        if (metrics.size() > 0) {
169          // we got metrics. so send the latest
170          for (AbstractMetric metric : record.metrics()) {
171            sb.append(metric.name());
172            String name = sb.toString();
173
174            // visit the metric to identify the Ganglia type and
175            // slope
176            metric.visit(gangliaMetricVisitor);
177            type = gangliaMetricVisitor.getType();
178            slopeFromMetric = gangliaMetricVisitor.getSlope();
179
180            GangliaConf gConf = getGangliaConfForMetric(name);
181            calculatedSlope = calculateSlope(gConf, slopeFromMetric);
182
183            // send metric to Ganglia
184            emitMetric(groupName, name, type, metric.value().toString(), gConf,
185                calculatedSlope);
186
187            // reset the length of the buffer for next iteration
188            sb.setLength(sbBaseLen);
189          }
190        }
191      }
192    } catch (IOException io) {
193      throw new MetricsException("Failed to putMetrics", io);
194    }
195  }
196
197  // Calculate the slope from properties and metric
198  private GangliaSlope calculateSlope(GangliaConf gConf,
199      GangliaSlope slopeFromMetric) {
200    if (gConf.getSlope() != null) {
201      // if slope has been specified in properties, use that
202      return gConf.getSlope();
203    } else if (slopeFromMetric != null) {
204      // slope not specified in properties, use derived from Metric
205      return slopeFromMetric;
206    } else {
207      return DEFAULT_SLOPE;
208    }
209  }
210
211  /**
212   * The method sends metrics to Ganglia servers. The method has been taken from
213   * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in
214   * order to keep it in sync.
215   * @param groupName The group name of the metric
216   * @param name The metric name
217   * @param type The type of the metric
218   * @param value The value of the metric
219   * @param gConf The GangliaConf for this metric
220   * @param gSlope The slope for this metric
221   * @throws IOException
222   */
223  protected void emitMetric(String groupName, String name, String type,
224      String value, GangliaConf gConf, GangliaSlope gSlope) throws IOException {
225
226    if (name == null) {
227      LOG.warn("Metric was emitted with no name.");
228      return;
229    } else if (value == null) {
230      LOG.warn("Metric name " + name + " was emitted with a null value.");
231      return;
232    } else if (type == null) {
233      LOG.warn("Metric name " + name + ", value " + value + " has no type.");
234      return;
235    }
236
237    if (LOG.isDebugEnabled()) {
238      LOG.debug("Emitting metric " + name + ", type " + type + ", value "
239          + value + ", slope " + gSlope.name() + " from hostname "
240          + getHostName());
241    }
242
243    xdr_int(0); // metric_user_defined
244    xdr_string(type);
245    xdr_string(name);
246    xdr_string(value);
247    xdr_string(gConf.getUnits());
248    xdr_int(gSlope.ordinal());
249    xdr_int(gConf.getTmax());
250    xdr_int(gConf.getDmax());
251
252    // send the metric to Ganglia hosts
253    emitToGangliaHosts();
254  }
255}