001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink.ganglia; 020 021import java.io.IOException; 022import java.util.Collection; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028 029import org.apache.commons.configuration.SubsetConfiguration; 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.metrics2.AbstractMetric; 034import org.apache.hadoop.metrics2.MetricsException; 035import org.apache.hadoop.metrics2.MetricsRecord; 036import org.apache.hadoop.metrics2.MetricsTag; 037import org.apache.hadoop.metrics2.impl.MsInfo; 038import org.apache.hadoop.metrics2.util.MetricsCache; 039import org.apache.hadoop.metrics2.util.MetricsCache.Record; 040 041/** 042 * This code supports Ganglia 3.0 043 * 044 */ 045public class GangliaSink30 extends AbstractGangliaSink { 046 047 public final Log LOG = LogFactory.getLog(this.getClass()); 048 049 private static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix."; 050 051 private MetricsCache metricsCache = new MetricsCache(); 052 053 // a key with a NULL value means ALL 054 private Map<String,Set<String>> useTagsMap = new HashMap<String,Set<String>>(); 055 056 @Override 057 @SuppressWarnings("unchecked") 058 public void init(SubsetConfiguration conf) { 059 super.init(conf); 060 061 conf.setListDelimiter(','); 062 Iterator<String> it = (Iterator<String>) conf.getKeys(); 063 while (it.hasNext()) { 064 String propertyName = it.next(); 065 if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) { 066 String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length()); 067 String[] tags = conf.getStringArray(propertyName); 068 boolean useAllTags = false; 069 Set<String> set = null; 070 if (tags.length > 0) { 071 set = new HashSet<String>(); 072 for (String tag : tags) { 073 tag = tag.trim(); 074 useAllTags |= tag.equals("*"); 075 if (tag.length() > 0) { 076 set.add(tag); 077 } 078 } 079 if (useAllTags) { 080 set = null; 081 } 082 } 083 useTagsMap.put(contextName, set); 084 } 085 } 086 } 087 088 @InterfaceAudience.Private 089 public void appendPrefix(MetricsRecord record, StringBuilder sb) { 090 String contextName = record.context(); 091 Collection<MetricsTag> tags = record.tags(); 092 if (useTagsMap.containsKey(contextName)) { 093 Set<String> useTags = useTagsMap.get(contextName); 094 for (MetricsTag t : tags) { 095 if (useTags == null || useTags.contains(t.name())) { 096 097 // the context is always skipped here because it is always added 098 099 // the hostname is always skipped to avoid case-mismatches 100 // from different DNSes. 101 102 if (t.info() != MsInfo.Context && t.info() != MsInfo.Hostname && t.value() != null) { 103 sb.append('.').append(t.name()).append('=').append(t.value()); 104 } 105 } 106 } 107 } 108 } 109 110 @Override 111 public void putMetrics(MetricsRecord record) { 112 // The method handles both cases whether Ganglia support dense publish 113 // of metrics of sparse (only on change) publish of metrics 114 try { 115 String recordName = record.name(); 116 String contextName = record.context(); 117 118 StringBuilder sb = new StringBuilder(); 119 sb.append(contextName); 120 sb.append('.'); 121 sb.append(recordName); 122 123 appendPrefix(record, sb); 124 125 String groupName = sb.toString(); 126 sb.append('.'); 127 int sbBaseLen = sb.length(); 128 129 String type = null; 130 GangliaSlope slopeFromMetric = null; 131 GangliaSlope calculatedSlope = null; 132 Record cachedMetrics = null; 133 resetBuffer(); // reset the buffer to the beginning 134 if (!isSupportSparseMetrics()) { 135 // for sending dense metrics, update metrics cache 136 // and get the updated data 137 cachedMetrics = metricsCache.update(record); 138 139 if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) { 140 for (Map.Entry<String, AbstractMetric> entry : cachedMetrics 141 .metricsEntrySet()) { 142 AbstractMetric metric = entry.getValue(); 143 sb.append(metric.name()); 144 String name = sb.toString(); 145 146 // visit the metric to identify the Ganglia type and 147 // slope 148 metric.visit(gangliaMetricVisitor); 149 type = gangliaMetricVisitor.getType(); 150 slopeFromMetric = gangliaMetricVisitor.getSlope(); 151 152 GangliaConf gConf = getGangliaConfForMetric(name); 153 calculatedSlope = calculateSlope(gConf, slopeFromMetric); 154 155 // send metric to Ganglia 156 emitMetric(groupName, name, type, metric.value().toString(), gConf, 157 calculatedSlope); 158 159 // reset the length of the buffer for next iteration 160 sb.setLength(sbBaseLen); 161 } 162 } 163 } else { 164 // we support sparse updates 165 166 Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record 167 .metrics(); 168 if (metrics.size() > 0) { 169 // we got metrics. so send the latest 170 for (AbstractMetric metric : record.metrics()) { 171 sb.append(metric.name()); 172 String name = sb.toString(); 173 174 // visit the metric to identify the Ganglia type and 175 // slope 176 metric.visit(gangliaMetricVisitor); 177 type = gangliaMetricVisitor.getType(); 178 slopeFromMetric = gangliaMetricVisitor.getSlope(); 179 180 GangliaConf gConf = getGangliaConfForMetric(name); 181 calculatedSlope = calculateSlope(gConf, slopeFromMetric); 182 183 // send metric to Ganglia 184 emitMetric(groupName, name, type, metric.value().toString(), gConf, 185 calculatedSlope); 186 187 // reset the length of the buffer for next iteration 188 sb.setLength(sbBaseLen); 189 } 190 } 191 } 192 } catch (IOException io) { 193 throw new MetricsException("Failed to putMetrics", io); 194 } 195 } 196 197 // Calculate the slope from properties and metric 198 private GangliaSlope calculateSlope(GangliaConf gConf, 199 GangliaSlope slopeFromMetric) { 200 if (gConf.getSlope() != null) { 201 // if slope has been specified in properties, use that 202 return gConf.getSlope(); 203 } else if (slopeFromMetric != null) { 204 // slope not specified in properties, use derived from Metric 205 return slopeFromMetric; 206 } else { 207 return DEFAULT_SLOPE; 208 } 209 } 210 211 /** 212 * The method sends metrics to Ganglia servers. The method has been taken from 213 * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in 214 * order to keep it in sync. 215 * @param groupName The group name of the metric 216 * @param name The metric name 217 * @param type The type of the metric 218 * @param value The value of the metric 219 * @param gConf The GangliaConf for this metric 220 * @param gSlope The slope for this metric 221 * @throws IOException 222 */ 223 protected void emitMetric(String groupName, String name, String type, 224 String value, GangliaConf gConf, GangliaSlope gSlope) throws IOException { 225 226 if (name == null) { 227 LOG.warn("Metric was emitted with no name."); 228 return; 229 } else if (value == null) { 230 LOG.warn("Metric name " + name + " was emitted with a null value."); 231 return; 232 } else if (type == null) { 233 LOG.warn("Metric name " + name + ", value " + value + " has no type."); 234 return; 235 } 236 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("Emitting metric " + name + ", type " + type + ", value " 239 + value + ", slope " + gSlope.name() + " from hostname " 240 + getHostName()); 241 } 242 243 xdr_int(0); // metric_user_defined 244 xdr_string(type); 245 xdr_string(name); 246 xdr_string(value); 247 xdr_string(gConf.getUnits()); 248 xdr_int(gSlope.ordinal()); 249 xdr_int(gConf.getTmax()); 250 xdr_int(gConf.getDmax()); 251 252 // send the metric to Ganglia hosts 253 emitToGangliaHosts(); 254 } 255}