001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.counters; 020 021import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId; 022import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup; 023 024import java.io.DataInput; 025import java.io.DataOutput; 026import java.io.IOException; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.Map; 030import java.util.concurrent.ConcurrentSkipListMap; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.apache.hadoop.classification.InterfaceAudience; 035import org.apache.hadoop.classification.InterfaceStability; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.io.Writable; 038import org.apache.hadoop.io.WritableUtils; 039import org.apache.hadoop.mapreduce.Counter; 040import org.apache.hadoop.mapreduce.FileSystemCounter; 041import org.apache.hadoop.mapreduce.JobCounter; 042import org.apache.hadoop.mapreduce.TaskCounter; 043import org.apache.hadoop.util.StringInterner; 044 045import com.google.common.collect.Iterables; 046import com.google.common.collect.Iterators; 047import com.google.common.collect.Maps; 048 049/** 050 * An abstract class to provide common implementation for the Counters 051 * container in both mapred and mapreduce packages. 052 * 053 * @param <C> type of counter inside the counters 054 * @param <G> type of group inside the counters 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Stable 058public abstract class AbstractCounters<C extends Counter, 059 G extends CounterGroupBase<C>> 060 implements Writable, Iterable<G> { 061 062 protected static final Log LOG = LogFactory.getLog("mapreduce.Counters"); 063 064 /** 065 * A cache from enum values to the associated counter. 066 */ 067 private final Map<Enum<?>, C> cache = Maps.newIdentityHashMap(); 068 //framework & fs groups 069 private final Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>(); 070 // other groups 071 private final Map<String, G> groups = new ConcurrentSkipListMap<String, G>(); 072 private final CounterGroupFactory<C, G> groupFactory; 073 074 // For framework counter serialization without strings 075 enum GroupType { FRAMEWORK, FILESYSTEM }; 076 077 // Writes only framework and fs counters if false. 078 private boolean writeAllCounters = true; 079 080 private static final Map<String, String> legacyMap = Maps.newHashMap(); 081 static { 082 legacyMap.put("org.apache.hadoop.mapred.Task$Counter", 083 TaskCounter.class.getName()); 084 legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter", 085 JobCounter.class.getName()); 086 legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName()); 087 } 088 089 private final Limits limits = new Limits(); 090 091 @InterfaceAudience.Private 092 public AbstractCounters(CounterGroupFactory<C, G> gf) { 093 groupFactory = gf; 094 } 095 096 /** 097 * Construct from another counters object. 098 * @param <C1> type of the other counter 099 * @param <G1> type of the other counter group 100 * @param counters the counters object to copy 101 * @param groupFactory the factory for new groups 102 */ 103 @InterfaceAudience.Private 104 public <C1 extends Counter, G1 extends CounterGroupBase<C1>> 105 AbstractCounters(AbstractCounters<C1, G1> counters, 106 CounterGroupFactory<C, G> groupFactory) { 107 this.groupFactory = groupFactory; 108 for(G1 group: counters) { 109 String name = group.getName(); 110 G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits); 111 (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup); 112 for(Counter counter: group) { 113 newGroup.addCounter(counter.getName(), counter.getDisplayName(), 114 counter.getValue()); 115 } 116 } 117 } 118 119 /** Add a group. 120 * @param group object to add 121 * @return the group 122 */ 123 @InterfaceAudience.Private 124 public synchronized G addGroup(G group) { 125 String name = group.getName(); 126 if (isFrameworkGroup(name)) { 127 fgroups.put(name, group); 128 } else { 129 limits.checkGroups(groups.size() + 1); 130 groups.put(name, group); 131 } 132 return group; 133 } 134 135 /** 136 * Add a new group 137 * @param name of the group 138 * @param displayName of the group 139 * @return the group 140 */ 141 @InterfaceAudience.Private 142 public G addGroup(String name, String displayName) { 143 return addGroup(groupFactory.newGroup(name, displayName, limits)); 144 } 145 146 /** 147 * Find a counter, create one if necessary 148 * @param groupName of the counter 149 * @param counterName name of the counter 150 * @return the matching counter 151 */ 152 public C findCounter(String groupName, String counterName) { 153 G grp = getGroup(groupName); 154 return grp.findCounter(counterName); 155 } 156 157 /** 158 * Find the counter for the given enum. The same enum will always return the 159 * same counter. 160 * @param key the counter key 161 * @return the matching counter object 162 */ 163 public synchronized C findCounter(Enum<?> key) { 164 C counter = cache.get(key); 165 if (counter == null) { 166 counter = findCounter(key.getDeclaringClass().getName(), key.name()); 167 cache.put(key, counter); 168 } 169 return counter; 170 } 171 172 /** 173 * Find the file system counter for the given scheme and enum. 174 * @param scheme of the file system 175 * @param key the enum of the counter 176 * @return the file system counter 177 */ 178 @InterfaceAudience.Private 179 public synchronized C findCounter(String scheme, FileSystemCounter key) { 180 return ((FileSystemCounterGroup<C>) getGroup( 181 FileSystemCounter.class.getName()).getUnderlyingGroup()). 182 findCounter(scheme, key); 183 } 184 185 /** 186 * Returns the names of all counter classes. 187 * @return Set of counter names. 188 */ 189 public synchronized Iterable<String> getGroupNames() { 190 HashSet<String> deprecated = new HashSet<String>(); 191 for(Map.Entry<String, String> entry : legacyMap.entrySet()) { 192 String newGroup = entry.getValue(); 193 boolean isFGroup = isFrameworkGroup(newGroup); 194 if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) { 195 deprecated.add(entry.getKey()); 196 } 197 } 198 return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated); 199 } 200 201 @Override 202 public Iterator<G> iterator() { 203 return Iterators.concat(fgroups.values().iterator(), 204 groups.values().iterator()); 205 } 206 207 /** 208 * Returns the named counter group, or an empty group if there is none 209 * with the specified name. 210 * @param groupName name of the group 211 * @return the group 212 */ 213 public synchronized G getGroup(String groupName) { 214 215 // filterGroupName 216 boolean groupNameInLegacyMap = true; 217 String newGroupName = legacyMap.get(groupName); 218 if (newGroupName == null) { 219 groupNameInLegacyMap = false; 220 newGroupName = Limits.filterGroupName(groupName); 221 } 222 223 boolean isFGroup = isFrameworkGroup(newGroupName); 224 G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName); 225 if (group == null) { 226 group = groupFactory.newGroup(newGroupName, limits); 227 if (isFGroup) { 228 fgroups.put(newGroupName, group); 229 } else { 230 limits.checkGroups(groups.size() + 1); 231 groups.put(newGroupName, group); 232 } 233 if (groupNameInLegacyMap) { 234 LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName 235 + " instead"); 236 } 237 } 238 return group; 239 } 240 241 /** 242 * Returns the total number of counters, by summing the number of counters 243 * in each group. 244 * @return the total number of counters 245 */ 246 public synchronized int countCounters() { 247 int result = 0; 248 for (G group : this) { 249 result += group.size(); 250 } 251 return result; 252 } 253 254 /** 255 * Write the set of groups. 256 * Counters ::= version #fgroups (groupId, group)* #groups (group)* 257 */ 258 @Override 259 public synchronized void write(DataOutput out) throws IOException { 260 WritableUtils.writeVInt(out, groupFactory.version()); 261 WritableUtils.writeVInt(out, fgroups.size()); // framework groups first 262 for (G group : fgroups.values()) { 263 if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) { 264 WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal()); 265 WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName())); 266 group.write(out); 267 } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) { 268 WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal()); 269 group.write(out); 270 } 271 } 272 if (writeAllCounters) { 273 WritableUtils.writeVInt(out, groups.size()); 274 for (G group : groups.values()) { 275 Text.writeString(out, group.getName()); 276 group.write(out); 277 } 278 } else { 279 WritableUtils.writeVInt(out, 0); 280 } 281 } 282 283 @Override 284 public synchronized void readFields(DataInput in) throws IOException { 285 int version = WritableUtils.readVInt(in); 286 if (version != groupFactory.version()) { 287 throw new IOException("Counters version mismatch, expected "+ 288 groupFactory.version() +" got "+ version); 289 } 290 int numFGroups = WritableUtils.readVInt(in); 291 fgroups.clear(); 292 GroupType[] groupTypes = GroupType.values(); 293 while (numFGroups-- > 0) { 294 GroupType groupType = groupTypes[WritableUtils.readVInt(in)]; 295 G group; 296 switch (groupType) { 297 case FILESYSTEM: // with nothing 298 group = groupFactory.newFileSystemGroup(); 299 break; 300 case FRAMEWORK: // with group id 301 group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in)); 302 break; 303 default: // Silence dumb compiler, as it would've thrown earlier 304 throw new IOException("Unexpected counter group type: "+ groupType); 305 } 306 group.readFields(in); 307 fgroups.put(group.getName(), group); 308 } 309 int numGroups = WritableUtils.readVInt(in); 310 while (numGroups-- > 0) { 311 limits.checkGroups(groups.size() + 1); 312 G group = groupFactory.newGenericGroup( 313 StringInterner.weakIntern(Text.readString(in)), null, limits); 314 group.readFields(in); 315 groups.put(group.getName(), group); 316 } 317 } 318 319 /** 320 * Return textual representation of the counter values. 321 * @return the string 322 */ 323 @Override 324 public synchronized String toString() { 325 StringBuilder sb = new StringBuilder("Counters: " + countCounters()); 326 for (G group: this) { 327 sb.append("\n\t").append(group.getDisplayName()); 328 for (Counter counter: group) { 329 sb.append("\n\t\t").append(counter.getDisplayName()).append("=") 330 .append(counter.getValue()); 331 } 332 } 333 return sb.toString(); 334 } 335 336 /** 337 * Increments multiple counters by their amounts in another Counters 338 * instance. 339 * @param other the other Counters instance 340 */ 341 public synchronized void incrAllCounters(AbstractCounters<C, G> other) { 342 for(G right : other) { 343 String groupName = right.getName(); 344 G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName); 345 if (left == null) { 346 left = addGroup(groupName, right.getDisplayName()); 347 } 348 left.incrAllCounters(right); 349 } 350 } 351 352 @Override 353 @SuppressWarnings("unchecked") 354 public boolean equals(Object genericRight) { 355 if (genericRight instanceof AbstractCounters<?, ?>) { 356 return Iterators.elementsEqual(iterator(), 357 ((AbstractCounters<C, G>)genericRight).iterator()); 358 } 359 return false; 360 } 361 362 @Override 363 public int hashCode() { 364 return groups.hashCode(); 365 } 366 367 /** 368 * Set the "writeAllCounters" option to true or false 369 * @param send if true all counters would be serialized, otherwise only 370 * framework counters would be serialized in 371 * {@link #write(DataOutput)} 372 */ 373 @InterfaceAudience.Private 374 public void setWriteAllCounters(boolean send) { 375 writeAllCounters = send; 376 } 377 378 /** 379 * Get the "writeAllCounters" option 380 * @return true of all counters would serialized 381 */ 382 @InterfaceAudience.Private 383 public boolean getWriteAllCounters() { 384 return writeAllCounters; 385 } 386 387 @InterfaceAudience.Private 388 public Limits limits() { 389 return limits; 390 } 391}