001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred;
020
021import static org.apache.hadoop.mapreduce.util.CountersStrings.parseEscapedCompactString;
022import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
023
024import java.io.DataInput;
025import java.io.DataOutput;
026import java.io.IOException;
027import java.text.ParseException;
028import java.util.Collection;
029import java.util.Iterator;
030
031import org.apache.commons.collections.IteratorUtils;
032import org.apache.commons.logging.Log;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.mapreduce.FileSystemCounter;
036import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
037import org.apache.hadoop.mapreduce.counters.AbstractCounters;
038import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
039import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
040import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
041import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
042import org.apache.hadoop.mapreduce.counters.GenericCounter;
043import org.apache.hadoop.mapreduce.counters.Limits;
044import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
045import org.apache.hadoop.mapreduce.util.CountersStrings;
046
047import com.google.common.collect.Iterators;
048
049/**
050 * A set of named counters.
051 *
052 * <p><code>Counters</code> represent global counters, defined either by the
053 * Map-Reduce framework or applications. Each <code>Counter</code> can be of
054 * any {@link Enum} type.</p>
055 *
056 * <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
057 * counters from a particular <code>Enum</code> class.
058 */
059@InterfaceAudience.Public
060@InterfaceStability.Stable
061public class Counters
062    extends AbstractCounters<Counters.Counter, Counters.Group> {
063  
064  public static int MAX_COUNTER_LIMIT = Limits.COUNTERS_MAX;
065  
066  public Counters() {
067    super(groupFactory);
068  }
069
070  public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
071    super(newCounters, groupFactory);
072  }
073
074  /**
075   * Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
076   * @param newCounters new Counters
077   * @return old Counters instance corresponding to newCounters
078   */
079  static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
080    return new Counters(newCounters);
081  }
082
083  public synchronized Group getGroup(String groupName) {
084    return super.getGroup(groupName);
085  }
086
087  @SuppressWarnings("unchecked")
088  public synchronized Collection<String> getGroupNames() {
089    return IteratorUtils.toList(super.getGroupNames().iterator());
090  }
091
092  public synchronized String makeCompactString() {
093    StringBuilder builder = new StringBuilder();
094    boolean first = true;
095    for(Group group: this){
096      for(Counter counter: group) {
097        if (first) {
098          first = false;
099        } else {
100          builder.append(',');
101        }
102        builder.append(group.getDisplayName());
103        builder.append('.');
104        builder.append(counter.getDisplayName());
105        builder.append(':');
106        builder.append(counter.getCounter());
107      }
108    }
109    return builder.toString();
110  }
111  
112  /**
113   * A counter record, comprising its name and value.
114   */
115  public static class Counter implements org.apache.hadoop.mapreduce.Counter {
116    org.apache.hadoop.mapreduce.Counter realCounter;
117
118    Counter(org.apache.hadoop.mapreduce.Counter counter) {
119      this.realCounter = counter;
120    }
121
122    public Counter() {
123      this(new GenericCounter());
124    }
125
126    @SuppressWarnings("deprecation")
127    @Override
128    public void setDisplayName(String displayName) {
129      realCounter.setDisplayName(displayName);
130    }
131
132    @Override
133    public String getName() {
134      return realCounter.getName();
135    }
136
137    @Override
138    public String getDisplayName() {
139      return realCounter.getDisplayName();
140    }
141
142    @Override
143    public long getValue() {
144      return realCounter.getValue();
145    }
146
147    @Override
148    public void setValue(long value) {
149      realCounter.setValue(value);
150    }
151
152    @Override
153    public void increment(long incr) {
154      realCounter.increment(incr);
155    }
156
157    @Override
158    public void write(DataOutput out) throws IOException {
159      realCounter.write(out);
160    }
161
162    @Override
163    public void readFields(DataInput in) throws IOException {
164      realCounter.readFields(in);
165    }
166
167    /**
168     * Returns the compact stringified version of the counter in the format
169     * [(actual-name)(display-name)(value)]
170     * @return the stringified result
171     */
172    public String makeEscapedCompactString() {
173      return toEscapedCompactString(realCounter);
174    }
175
176    /**
177     * Checks for (content) equality of two (basic) counters
178     * @param counter to compare
179     * @return true if content equals
180     * @deprecated
181     */
182    @Deprecated
183    public boolean contentEquals(Counter counter) {
184      return realCounter.equals(counter.getUnderlyingCounter());
185    }
186
187    /**
188     * @return the value of the counter
189     */
190    public long getCounter() {
191      return realCounter.getValue();
192    }
193
194    @Override
195    public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
196      return realCounter;
197    }
198    
199    @Override
200    public synchronized boolean equals(Object genericRight) {
201      if (genericRight instanceof Counter) {
202        synchronized (genericRight) {
203          Counter right = (Counter) genericRight;
204          return getName().equals(right.getName()) &&
205                 getDisplayName().equals(right.getDisplayName()) &&
206                 getValue() == right.getValue();
207        }
208      }
209      return false;
210    }
211    
212    @Override
213    public int hashCode() {
214      return realCounter.hashCode();
215    }
216  }
217
218
219  /**
220   *  <code>Group</code> of counters, comprising of counters from a particular
221   *  counter {@link Enum} class.
222   *
223   *  <p><code>Group</code>handles localization of the class name and the
224   *  counter names.</p>
225   */
226  public static class Group implements CounterGroupBase<Counter> {
227    private CounterGroupBase<Counter> realGroup;
228    
229    Group(GenericGroup group) {
230      this.realGroup = group;
231    }
232    Group(FSGroupImpl group) {
233      this.realGroup = group;
234    }
235    
236    @SuppressWarnings({ "unchecked", "rawtypes" })
237    Group(FrameworkGroupImpl group) {
238      this.realGroup = group;
239    }
240    
241    /**
242     * @param counterName the name of the counter
243     * @return the value of the specified counter, or 0 if the counter does
244     * not exist.
245     */
246    public long getCounter(String counterName)  {
247      return getCounterValue(realGroup, counterName);
248    }
249
250    /**
251     * @return the compact stringified version of the group in the format
252     * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
253     * for the counters within.
254     */
255    public String makeEscapedCompactString() {
256      return toEscapedCompactString(realGroup);
257    }
258
259    /**
260     * Get the counter for the given id and create it if it doesn't exist.
261     * @param id the numeric id of the counter within the group
262     * @param name the internal counter name
263     * @return the counter
264     * @deprecated use {@link #findCounter(String)} instead
265     */
266    @Deprecated
267    public Counter getCounter(int id, String name) {
268      return findCounter(name);
269    }
270
271    /**
272     * Get the counter for the given name and create it if it doesn't exist.
273     * @param name the internal counter name
274     * @return the counter
275     */
276    public Counter getCounterForName(String name) {
277      return findCounter(name);
278    }
279
280    @Override
281    public void write(DataOutput out) throws IOException {
282     realGroup.write(out); 
283    }
284
285    @Override
286    public void readFields(DataInput in) throws IOException {
287      realGroup.readFields(in);
288    }
289
290    @Override
291    public Iterator<Counter> iterator() {
292      return realGroup.iterator();
293    }
294
295    @Override
296    public String getName() {
297      return realGroup.getName();
298    }
299
300    @Override
301    public String getDisplayName() {
302      return realGroup.getDisplayName();
303    }
304
305    @Override
306    public void setDisplayName(String displayName) {
307      realGroup.setDisplayName(displayName);
308    }
309
310    @Override
311    public void addCounter(Counter counter) {
312      realGroup.addCounter(counter);
313    }
314
315    @Override
316    public Counter addCounter(String name, String displayName, long value) {
317      return realGroup.addCounter(name, displayName, value);
318    }
319
320    @Override
321    public Counter findCounter(String counterName, String displayName) {
322      return realGroup.findCounter(counterName, displayName);
323    }
324
325    @Override
326    public Counter findCounter(String counterName, boolean create) {
327      return realGroup.findCounter(counterName, create);
328    }
329
330    @Override
331    public Counter findCounter(String counterName) {
332      return realGroup.findCounter(counterName);
333    }
334
335    @Override
336    public int size() {
337      return realGroup.size();
338    }
339
340    @Override
341    public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
342      realGroup.incrAllCounters(rightGroup);
343    }
344    
345    @Override
346    public CounterGroupBase<Counter> getUnderlyingGroup() {
347      return realGroup;
348    }
349
350    @Override
351    public synchronized boolean equals(Object genericRight) {
352      if (genericRight instanceof CounterGroupBase<?>) {
353        @SuppressWarnings("unchecked")
354        CounterGroupBase<Counter> right = ((CounterGroupBase<Counter>) 
355        genericRight).getUnderlyingGroup();
356        return Iterators.elementsEqual(iterator(), right.iterator());
357      }
358      return false;
359    }
360
361    @Override
362    public int hashCode() {
363      return realGroup.hashCode();
364    }
365  }
366
367  // All the group impls need this for legacy group interface
368  static long getCounterValue(CounterGroupBase<Counter> group, String counterName) {
369    Counter counter = group.findCounter(counterName, false);
370    if (counter != null) return counter.getValue();
371    return 0L;
372  }
373
374  // Mix the generic group implementation into the Group interface
375  private static class GenericGroup extends AbstractCounterGroup<Counter> {
376
377    GenericGroup(String name, String displayName, Limits limits) {
378      super(name, displayName, limits);
379    }
380
381    @Override
382    protected Counter newCounter(String counterName, String displayName,
383                                 long value) {
384      return new Counter(new GenericCounter(counterName, displayName, value));
385    }
386
387    @Override
388    protected Counter newCounter() {
389      return new Counter();
390    }
391    
392    @Override
393    public CounterGroupBase<Counter> getUnderlyingGroup() {
394     return this;
395    }
396  }
397
398  // Mix the framework group implementation into the Group interface
399  private static class FrameworkGroupImpl<T extends Enum<T>>
400      extends FrameworkCounterGroup<T, Counter> {
401
402    // Mix the framework counter implementation into the Counter interface
403    class FrameworkCounterImpl extends FrameworkCounter {
404      FrameworkCounterImpl(T key) {
405        super(key);
406      }
407
408    }
409
410    FrameworkGroupImpl(Class<T> cls) {
411      super(cls);
412    }
413
414    @Override
415    protected Counter newCounter(T key) {
416      return new Counter(new FrameworkCounterImpl(key));
417    }
418
419    @Override
420    public CounterGroupBase<Counter> getUnderlyingGroup() {
421      return this;
422    }
423  }
424
425  // Mix the file system counter group implementation into the Group interface
426  private static class FSGroupImpl extends FileSystemCounterGroup<Counter> {
427
428    private class FSCounterImpl extends FSCounter {
429
430      FSCounterImpl(String scheme, FileSystemCounter key) {
431        super(scheme, key);
432      }
433
434    }
435
436    @Override
437    protected Counter newCounter(String scheme, FileSystemCounter key) {
438      return new Counter(new FSCounterImpl(scheme, key));
439    }
440
441    @Override
442    public CounterGroupBase<Counter> getUnderlyingGroup() {
443      return this;
444    }
445  }
446
447  public synchronized Counter findCounter(String group, String name) {
448    if (name.equals("MAP_INPUT_BYTES")) {
449      LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
450               "Use FileInputFormatCounters as group name and " +
451               " BYTES_READ as counter name instead");
452      return findCounter(FileInputFormatCounter.BYTES_READ);
453    }
454    return getGroup(group).getCounterForName(name);
455  }
456
457  /**
458   * Provide factory methods for counter group factory implementation.
459   * See also the GroupFactory in
460   *  {@link org.apache.hadoop.mapreduce.Counters mapreduce.Counters}
461   */
462  static class GroupFactory extends CounterGroupFactory<Counter, Group> {
463
464    @Override
465    protected <T extends Enum<T>>
466    FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
467      return new FrameworkGroupFactory<Group>() {
468        @Override public Group newGroup(String name) {
469          return new Group(new FrameworkGroupImpl<T>(cls)); // impl in this package
470        }
471      };
472    }
473
474    @Override
475    protected Group newGenericGroup(String name, String displayName,
476                                    Limits limits) {
477      return new Group(new GenericGroup(name, displayName, limits));
478    }
479
480    @Override
481    protected Group newFileSystemGroup() {
482      return new Group(new FSGroupImpl());
483    }
484  }
485
486  private static final GroupFactory groupFactory = new GroupFactory();
487
488  /**
489   * Find a counter by using strings
490   * @param group the name of the group
491   * @param id the id of the counter within the group (0 to N-1)
492   * @param name the internal name of the counter
493   * @return the counter for that name
494   * @deprecated use {@link #findCounter(String, String)} instead
495   */
496  @Deprecated
497  public Counter findCounter(String group, int id, String name) {
498    return findCounter(group, name);
499  }
500
501  /**
502   * Increments the specified counter by the specified amount, creating it if
503   * it didn't already exist.
504   * @param key identifies a counter
505   * @param amount amount by which counter is to be incremented
506   */
507  public void incrCounter(Enum<?> key, long amount) {
508    findCounter(key).increment(amount);
509  }
510
511  /**
512   * Increments the specified counter by the specified amount, creating it if
513   * it didn't already exist.
514   * @param group the name of the group
515   * @param counter the internal name of the counter
516   * @param amount amount by which counter is to be incremented
517   */
518  public void incrCounter(String group, String counter, long amount) {
519    findCounter(group, counter).increment(amount);
520  }
521
522  /**
523   * Returns current value of the specified counter, or 0 if the counter
524   * does not exist.
525   * @param key the counter enum to lookup
526   * @return the counter value or 0 if counter not found
527   */
528  public synchronized long getCounter(Enum<?> key) {
529    return findCounter(key).getValue();
530  }
531
532  /**
533   * Increments multiple counters by their amounts in another Counters
534   * instance.
535   * @param other the other Counters instance
536   */
537  public synchronized void incrAllCounters(Counters other) {
538    for (Group otherGroup: other) {
539      Group group = getGroup(otherGroup.getName());
540      group.setDisplayName(otherGroup.getDisplayName());
541      for (Counter otherCounter : otherGroup) {
542        Counter counter = group.getCounterForName(otherCounter.getName());
543        counter.setDisplayName(otherCounter.getDisplayName());
544        counter.increment(otherCounter.getValue());
545      }
546    }
547  }
548
549  /**
550   * @return the total number of counters
551   * @deprecated use {@link #countCounters()} instead
552   */
553  public int size() {
554    return countCounters();
555  }
556
557  /**
558   * Convenience method for computing the sum of two sets of counters.
559   * @param a the first counters
560   * @param b the second counters
561   * @return a new summed counters object
562   */
563  public static Counters sum(Counters a, Counters b) {
564    Counters counters = new Counters();
565    counters.incrAllCounters(a);
566    counters.incrAllCounters(b);
567    return counters;
568  }
569
570  /**
571   * Logs the current counter values.
572   * @param log The log to use.
573   */
574  public void log(Log log) {
575    log.info("Counters: " + size());
576    for(Group group: this) {
577      log.info("  " + group.getDisplayName());
578      for (Counter counter: group) {
579        log.info("    " + counter.getDisplayName() + "=" +
580                 counter.getCounter());
581      }
582    }
583  }
584
585  /**
586   * Represent the counter in a textual format that can be converted back to
587   * its object form
588   * @return the string in the following format
589   * {(groupName)(group-displayName)[(counterName)(displayName)(value)][]*}*
590   */
591  public String makeEscapedCompactString() {
592    return toEscapedCompactString(this);
593  }
594
595  /**
596   * Convert a stringified (by {@link #makeEscapedCompactString()} counter
597   * representation into a counter object.
598   * @param compactString to parse
599   * @return a new counters object
600   * @throws ParseException
601   */
602  public static Counters fromEscapedCompactString(String compactString)
603      throws ParseException {
604    return parseEscapedCompactString(compactString, new Counters());
605  }
606}