001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.counters;
020
021import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.getFrameworkGroupId;
022import static org.apache.hadoop.mapreduce.counters.CounterGroupFactory.isFrameworkGroup;
023
024import java.io.DataInput;
025import java.io.DataOutput;
026import java.io.IOException;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.Map;
030import java.util.concurrent.ConcurrentSkipListMap;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.apache.hadoop.classification.InterfaceAudience;
035import org.apache.hadoop.classification.InterfaceStability;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.io.Writable;
038import org.apache.hadoop.io.WritableUtils;
039import org.apache.hadoop.mapreduce.Counter;
040import org.apache.hadoop.mapreduce.FileSystemCounter;
041import org.apache.hadoop.mapreduce.JobCounter;
042import org.apache.hadoop.mapreduce.TaskCounter;
043import org.apache.hadoop.util.StringInterner;
044
045import com.google.common.collect.Iterables;
046import com.google.common.collect.Iterators;
047import com.google.common.collect.Maps;
048
049/**
050 * An abstract class to provide common implementation for the Counters
051 * container in both mapred and mapreduce packages.
052 *
053 * @param <C> type of counter inside the counters
054 * @param <G> type of group inside the counters
055 */
056@InterfaceAudience.Public
057@InterfaceStability.Stable
058public abstract class AbstractCounters<C extends Counter,
059                                       G extends CounterGroupBase<C>>
060    implements Writable, Iterable<G> {
061
062  protected static final Log LOG = LogFactory.getLog("mapreduce.Counters");
063
064  /**
065   * A cache from enum values to the associated counter.
066   */
067  private final Map<Enum<?>, C> cache = Maps.newIdentityHashMap();
068  //framework & fs groups
069  private final Map<String, G> fgroups = new ConcurrentSkipListMap<String, G>();
070  // other groups
071  private final Map<String, G> groups = new ConcurrentSkipListMap<String, G>();
072  private final CounterGroupFactory<C, G> groupFactory;
073
074  // For framework counter serialization without strings
075  enum GroupType { FRAMEWORK, FILESYSTEM };
076
077  // Writes only framework and fs counters if false.
078  private boolean writeAllCounters = true;
079
080  private static final Map<String, String> legacyMap = Maps.newHashMap();
081  static {
082    legacyMap.put("org.apache.hadoop.mapred.Task$Counter",
083                  TaskCounter.class.getName());
084    legacyMap.put("org.apache.hadoop.mapred.JobInProgress$Counter",
085                  JobCounter.class.getName());
086    legacyMap.put("FileSystemCounters", FileSystemCounter.class.getName());
087  }
088
089  private final Limits limits = new Limits();
090
091  @InterfaceAudience.Private
092  public AbstractCounters(CounterGroupFactory<C, G> gf) {
093    groupFactory = gf;
094  }
095
096  /**
097   * Construct from another counters object.
098   * @param <C1> type of the other counter
099   * @param <G1> type of the other counter group
100   * @param counters the counters object to copy
101   * @param groupFactory the factory for new groups
102   */
103  @InterfaceAudience.Private
104  public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
105  AbstractCounters(AbstractCounters<C1, G1> counters,
106                   CounterGroupFactory<C, G> groupFactory) {
107    this.groupFactory = groupFactory;
108    for(G1 group: counters) {
109      String name = group.getName();
110      G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
111      (isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
112      for(Counter counter: group) {
113        newGroup.addCounter(counter.getName(), counter.getDisplayName(),
114                            counter.getValue());
115      }
116    }
117  }
118
119  /** Add a group.
120   * @param group object to add
121   * @return the group
122   */
123  @InterfaceAudience.Private
124  public synchronized G addGroup(G group) {
125    String name = group.getName();
126    if (isFrameworkGroup(name)) {
127      fgroups.put(name, group);
128    } else {
129      limits.checkGroups(groups.size() + 1);
130      groups.put(name, group);
131    }
132    return group;
133  }
134
135  /**
136   * Add a new group
137   * @param name of the group
138   * @param displayName of the group
139   * @return the group
140   */
141  @InterfaceAudience.Private
142  public G addGroup(String name, String displayName) {
143    return addGroup(groupFactory.newGroup(name, displayName, limits));
144  }
145
146  /**
147   * Find a counter, create one if necessary
148   * @param groupName of the counter
149   * @param counterName name of the counter
150   * @return the matching counter
151   */
152  public C findCounter(String groupName, String counterName) {
153    G grp = getGroup(groupName);
154    return grp.findCounter(counterName);
155  }
156
157  /**
158   * Find the counter for the given enum. The same enum will always return the
159   * same counter.
160   * @param key the counter key
161   * @return the matching counter object
162   */
163  public synchronized C findCounter(Enum<?> key) {
164    C counter = cache.get(key);
165    if (counter == null) {
166      counter = findCounter(key.getDeclaringClass().getName(), key.name());
167      cache.put(key, counter);
168    }
169    return counter;
170  }
171
172  /**
173   * Find the file system counter for the given scheme and enum.
174   * @param scheme of the file system
175   * @param key the enum of the counter
176   * @return the file system counter
177   */
178  @InterfaceAudience.Private
179  public synchronized C findCounter(String scheme, FileSystemCounter key) {
180    return ((FileSystemCounterGroup<C>) getGroup(
181        FileSystemCounter.class.getName()).getUnderlyingGroup()).
182        findCounter(scheme, key);
183  }
184
185  /**
186   * Returns the names of all counter classes.
187   * @return Set of counter names.
188   */
189  public synchronized Iterable<String> getGroupNames() {
190    HashSet<String> deprecated = new HashSet<String>();
191    for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
192      String newGroup = entry.getValue();
193      boolean isFGroup = isFrameworkGroup(newGroup);
194      if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
195        deprecated.add(entry.getKey());
196      }
197    }
198    return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);
199  }
200
201  @Override
202  public Iterator<G> iterator() {
203    return Iterators.concat(fgroups.values().iterator(),
204                            groups.values().iterator());
205  }
206
207  /**
208   * Returns the named counter group, or an empty group if there is none
209   * with the specified name.
210   * @param groupName name of the group
211   * @return the group
212   */
213  public synchronized G getGroup(String groupName) {
214
215    // filterGroupName
216    boolean groupNameInLegacyMap = true;
217    String newGroupName = legacyMap.get(groupName);
218    if (newGroupName == null) {
219      groupNameInLegacyMap = false;
220      newGroupName = Limits.filterGroupName(groupName);
221    }
222
223    boolean isFGroup = isFrameworkGroup(newGroupName);
224    G group = isFGroup ? fgroups.get(newGroupName) : groups.get(newGroupName);
225    if (group == null) {
226      group = groupFactory.newGroup(newGroupName, limits);
227      if (isFGroup) {
228        fgroups.put(newGroupName, group);
229      } else {
230        limits.checkGroups(groups.size() + 1);
231        groups.put(newGroupName, group);
232      }
233      if (groupNameInLegacyMap) {
234        LOG.warn("Group " + groupName + " is deprecated. Use " + newGroupName
235            + " instead");
236      }
237    }
238    return group;
239  }
240
241  /**
242   * Returns the total number of counters, by summing the number of counters
243   * in each group.
244   * @return the total number of counters
245   */
246  public synchronized int countCounters() {
247    int result = 0;
248    for (G group : this) {
249      result += group.size();
250    }
251    return result;
252  }
253
254  /**
255   * Write the set of groups.
256   * Counters ::= version #fgroups (groupId, group)* #groups (group)*
257   */
258  @Override
259  public synchronized void write(DataOutput out) throws IOException {
260    WritableUtils.writeVInt(out, groupFactory.version());
261    WritableUtils.writeVInt(out, fgroups.size());  // framework groups first
262    for (G group : fgroups.values()) {
263      if (group.getUnderlyingGroup() instanceof FrameworkCounterGroup<?, ?>) {
264        WritableUtils.writeVInt(out, GroupType.FRAMEWORK.ordinal());
265        WritableUtils.writeVInt(out, getFrameworkGroupId(group.getName()));
266        group.write(out);
267      } else if (group.getUnderlyingGroup() instanceof FileSystemCounterGroup<?>) {
268        WritableUtils.writeVInt(out, GroupType.FILESYSTEM.ordinal());
269        group.write(out);
270      }
271    }
272    if (writeAllCounters) {
273      WritableUtils.writeVInt(out, groups.size());
274      for (G group : groups.values()) {
275        Text.writeString(out, group.getName());
276        group.write(out);
277      }
278    } else {
279      WritableUtils.writeVInt(out, 0);
280    }
281  }
282
283  @Override
284  public synchronized void readFields(DataInput in) throws IOException {
285    int version = WritableUtils.readVInt(in);
286    if (version != groupFactory.version()) {
287      throw new IOException("Counters version mismatch, expected "+
288          groupFactory.version() +" got "+ version);
289    }
290    int numFGroups = WritableUtils.readVInt(in);
291    fgroups.clear();
292    GroupType[] groupTypes = GroupType.values();
293    while (numFGroups-- > 0) {
294      GroupType groupType = groupTypes[WritableUtils.readVInt(in)];
295      G group;
296      switch (groupType) {
297        case FILESYSTEM: // with nothing
298          group = groupFactory.newFileSystemGroup();
299          break;
300        case FRAMEWORK:  // with group id
301          group = groupFactory.newFrameworkGroup(WritableUtils.readVInt(in));
302          break;
303        default: // Silence dumb compiler, as it would've thrown earlier
304          throw new IOException("Unexpected counter group type: "+ groupType);
305      }
306      group.readFields(in);
307      fgroups.put(group.getName(), group);
308    }
309    int numGroups = WritableUtils.readVInt(in);
310    while (numGroups-- > 0) {
311      limits.checkGroups(groups.size() + 1);
312      G group = groupFactory.newGenericGroup(
313          StringInterner.weakIntern(Text.readString(in)), null, limits);
314      group.readFields(in);
315      groups.put(group.getName(), group);
316    }
317  }
318
319  /**
320   * Return textual representation of the counter values.
321   * @return the string
322   */
323  @Override
324  public synchronized String toString() {
325    StringBuilder sb = new StringBuilder("Counters: " + countCounters());
326    for (G group: this) {
327      sb.append("\n\t").append(group.getDisplayName());
328      for (Counter counter: group) {
329        sb.append("\n\t\t").append(counter.getDisplayName()).append("=")
330          .append(counter.getValue());
331      }
332    }
333    return sb.toString();
334  }
335
336  /**
337   * Increments multiple counters by their amounts in another Counters
338   * instance.
339   * @param other the other Counters instance
340   */
341  public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
342    for(G right : other) {
343      String groupName = right.getName();
344      G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
345      if (left == null) {
346        left = addGroup(groupName, right.getDisplayName());
347      }
348      left.incrAllCounters(right);
349    }
350  }
351
352  @Override
353  @SuppressWarnings("unchecked")
354  public boolean equals(Object genericRight) {
355    if (genericRight instanceof AbstractCounters<?, ?>) {
356      return Iterators.elementsEqual(iterator(),
357          ((AbstractCounters<C, G>)genericRight).iterator());
358    }
359    return false;
360  }
361
362  @Override
363  public int hashCode() {
364    return groups.hashCode();
365  }
366
367  /**
368   * Set the "writeAllCounters" option to true or false
369   * @param send  if true all counters would be serialized, otherwise only
370   *              framework counters would be serialized in
371   *              {@link #write(DataOutput)}
372   */
373  @InterfaceAudience.Private
374  public void setWriteAllCounters(boolean send) {
375    writeAllCounters = send;
376  }
377
378  /**
379   * Get the "writeAllCounters" option
380   * @return true of all counters would serialized
381   */
382  @InterfaceAudience.Private
383  public boolean getWriteAllCounters() {
384    return writeAllCounters;
385  }
386
387  @InterfaceAudience.Private
388  public Limits limits() {
389    return limits;
390  }
391}