001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred;
020    
021    import static org.apache.hadoop.mapreduce.util.CountersStrings.parseEscapedCompactString;
022    import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
023    
024    import java.io.DataInput;
025    import java.io.DataOutput;
026    import java.io.IOException;
027    import java.text.ParseException;
028    import java.util.Collection;
029    import java.util.HashMap;
030    import java.util.Iterator;
031    
032    import org.apache.commons.collections.IteratorUtils;
033    import org.apache.commons.logging.Log;
034    import org.apache.hadoop.classification.InterfaceAudience;
035    import org.apache.hadoop.classification.InterfaceStability;
036    import org.apache.hadoop.mapreduce.FileSystemCounter;
037    import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
038    import org.apache.hadoop.mapreduce.counters.AbstractCounters;
039    import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
040    import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
041    import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
042    import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
043    import org.apache.hadoop.mapreduce.counters.GenericCounter;
044    import org.apache.hadoop.mapreduce.counters.Limits;
045    import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
046    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
047    
048    import com.google.common.collect.Iterators;
049    
050    /**
051     * A set of named counters.
052     *
053     * <p><code>Counters</code> represent global counters, defined either by the
054     * Map-Reduce framework or applications. Each <code>Counter</code> can be of
055     * any {@link Enum} type.</p>
056     *
057     * <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
058     * counters from a particular <code>Enum</code> class.
059     */
060    @InterfaceAudience.Public
061    @InterfaceStability.Stable
062    public class Counters
063        extends AbstractCounters<Counters.Counter, Counters.Group> {
064      
065      public static int MAX_COUNTER_LIMIT = Limits.getCountersMax();
066      public static int MAX_GROUP_LIMIT = Limits.getGroupsMax();
067      private static HashMap<String, String> depricatedCounterMap =
068          new HashMap<String, String>();
069      
070      static {
071        initDepricatedMap();
072      }
073      
074      public Counters() {
075        super(groupFactory);
076      }
077    
078      public Counters(org.apache.hadoop.mapreduce.Counters newCounters) {
079        super(newCounters, groupFactory);
080      }
081    
082      @SuppressWarnings({ "deprecation" })
083      private static void initDepricatedMap() {
084        depricatedCounterMap.put(FileInputFormat.Counter.class.getName(),
085          FileInputFormatCounter.class.getName());
086        depricatedCounterMap.put(FileOutputFormat.Counter.class.getName(),
087          FileOutputFormatCounter.class.getName());
088        depricatedCounterMap.put(
089          org.apache.hadoop.mapreduce.lib.input.FileInputFormat.Counter.class
090            .getName(), FileInputFormatCounter.class.getName());
091        depricatedCounterMap.put(
092          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.class
093            .getName(), FileOutputFormatCounter.class.getName());
094      }
095    
096      private static String getNewGroupKey(String oldGroup) {
097        if (depricatedCounterMap.containsKey(oldGroup)) {
098          return depricatedCounterMap.get(oldGroup);
099        }
100        return null;
101      }
102      
103      /**
104       * Downgrade new {@link org.apache.hadoop.mapreduce.Counters} to old Counters
105       * @param newCounters new Counters
106       * @return old Counters instance corresponding to newCounters
107       */
108      static Counters downgrade(org.apache.hadoop.mapreduce.Counters newCounters) {
109        return new Counters(newCounters);
110      }
111    
112      public synchronized Group getGroup(String groupName) {
113        return super.getGroup(groupName);
114      }
115    
116      @SuppressWarnings("unchecked")
117      public synchronized Collection<String> getGroupNames() {
118        return IteratorUtils.toList(super.getGroupNames().iterator());
119      }
120    
121      public synchronized String makeCompactString() {
122        StringBuilder builder = new StringBuilder();
123        boolean first = true;
124        for(Group group: this){
125          for(Counter counter: group) {
126            if (first) {
127              first = false;
128            } else {
129              builder.append(',');
130            }
131            builder.append(group.getDisplayName());
132            builder.append('.');
133            builder.append(counter.getDisplayName());
134            builder.append(':');
135            builder.append(counter.getCounter());
136          }
137        }
138        return builder.toString();
139      }
140      
141      /**
142       * A counter record, comprising its name and value.
143       */
144      @InterfaceAudience.Public
145      @InterfaceStability.Stable
146      public static class Counter implements org.apache.hadoop.mapreduce.Counter {
147        org.apache.hadoop.mapreduce.Counter realCounter;
148    
149        Counter(org.apache.hadoop.mapreduce.Counter counter) {
150          this.realCounter = counter;
151        }
152    
153        public Counter() {
154          this(new GenericCounter());
155        }
156    
157        @SuppressWarnings("deprecation")
158        @Override
159        public void setDisplayName(String displayName) {
160          realCounter.setDisplayName(displayName);
161        }
162    
163        @Override
164        public String getName() {
165          return realCounter.getName();
166        }
167    
168        @Override
169        public String getDisplayName() {
170          return realCounter.getDisplayName();
171        }
172    
173        @Override
174        public long getValue() {
175          return realCounter.getValue();
176        }
177    
178        @Override
179        public void setValue(long value) {
180          realCounter.setValue(value);
181        }
182    
183        @Override
184        public void increment(long incr) {
185          realCounter.increment(incr);
186        }
187    
188        @Override
189        public void write(DataOutput out) throws IOException {
190          realCounter.write(out);
191        }
192    
193        @Override
194        public void readFields(DataInput in) throws IOException {
195          realCounter.readFields(in);
196        }
197    
198        /**
199         * Returns the compact stringified version of the counter in the format
200         * [(actual-name)(display-name)(value)]
201         * @return the stringified result
202         */
203        public String makeEscapedCompactString() {
204          return toEscapedCompactString(realCounter);
205        }
206    
207        /**
208         * Checks for (content) equality of two (basic) counters
209         * @param counter to compare
210         * @return true if content equals
211         * @deprecated
212         */
213        @Deprecated
214        public boolean contentEquals(Counter counter) {
215          return realCounter.equals(counter.getUnderlyingCounter());
216        }
217    
218        /**
219         * @return the value of the counter
220         */
221        public long getCounter() {
222          return realCounter.getValue();
223        }
224    
225        @Override
226        public org.apache.hadoop.mapreduce.Counter getUnderlyingCounter() {
227          return realCounter;
228        }
229        
230        @Override
231        public synchronized boolean equals(Object genericRight) {
232          if (genericRight instanceof Counter) {
233            synchronized (genericRight) {
234              Counter right = (Counter) genericRight;
235              return getName().equals(right.getName()) &&
236                     getDisplayName().equals(right.getDisplayName()) &&
237                     getValue() == right.getValue();
238            }
239          }
240          return false;
241        }
242        
243        @Override
244        public int hashCode() {
245          return realCounter.hashCode();
246        }
247      }
248    
249    
250      /**
251       *  <code>Group</code> of counters, comprising of counters from a particular
252       *  counter {@link Enum} class.
253       *
254       *  <p><code>Group</code>handles localization of the class name and the
255       *  counter names.</p>
256       */
257      @InterfaceAudience.Public
258      @InterfaceStability.Stable
259      public static class Group implements CounterGroupBase<Counter> {
260        private CounterGroupBase<Counter> realGroup;
261        
262        protected Group() {
263          realGroup = null;
264        }
265        
266        Group(GenericGroup group) {
267          this.realGroup = group;
268        }
269        Group(FSGroupImpl group) {
270          this.realGroup = group;
271        }
272        
273        @SuppressWarnings({ "unchecked", "rawtypes" })
274        Group(FrameworkGroupImpl group) {
275          this.realGroup = group;
276        }
277        
278        /**
279         * @param counterName the name of the counter
280         * @return the value of the specified counter, or 0 if the counter does
281         * not exist.
282         */
283        public long getCounter(String counterName)  {
284          return getCounterValue(realGroup, counterName);
285        }
286    
287        /**
288         * @return the compact stringified version of the group in the format
289         * {(actual-name)(display-name)(value)[][][]} where [] are compact strings
290         * for the counters within.
291         */
292        public String makeEscapedCompactString() {
293          return toEscapedCompactString(realGroup);
294        }
295    
296        /**
297         * Get the counter for the given id and create it if it doesn't exist.
298         * @param id the numeric id of the counter within the group
299         * @param name the internal counter name
300         * @return the counter
301         * @deprecated use {@link #findCounter(String)} instead
302         */
303        @Deprecated
304        public Counter getCounter(int id, String name) {
305          return findCounter(name);
306        }
307    
308        /**
309         * Get the counter for the given name and create it if it doesn't exist.
310         * @param name the internal counter name
311         * @return the counter
312         */
313        public Counter getCounterForName(String name) {
314          return findCounter(name);
315        }
316    
317        @Override
318        public void write(DataOutput out) throws IOException {
319         realGroup.write(out); 
320        }
321    
322        @Override
323        public void readFields(DataInput in) throws IOException {
324          realGroup.readFields(in);
325        }
326    
327        @Override
328        public Iterator<Counter> iterator() {
329          return realGroup.iterator();
330        }
331    
332        @Override
333        public String getName() {
334          return realGroup.getName();
335        }
336    
337        @Override
338        public String getDisplayName() {
339          return realGroup.getDisplayName();
340        }
341    
342        @Override
343        public void setDisplayName(String displayName) {
344          realGroup.setDisplayName(displayName);
345        }
346    
347        @Override
348        public void addCounter(Counter counter) {
349          realGroup.addCounter(counter);
350        }
351    
352        @Override
353        public Counter addCounter(String name, String displayName, long value) {
354          return realGroup.addCounter(name, displayName, value);
355        }
356    
357        @Override
358        public Counter findCounter(String counterName, String displayName) {
359          return realGroup.findCounter(counterName, displayName);
360        }
361    
362        @Override
363        public Counter findCounter(String counterName, boolean create) {
364          return realGroup.findCounter(counterName, create);
365        }
366    
367        @Override
368        public Counter findCounter(String counterName) {
369          return realGroup.findCounter(counterName);
370        }
371    
372        @Override
373        public int size() {
374          return realGroup.size();
375        }
376    
377        @Override
378        public void incrAllCounters(CounterGroupBase<Counter> rightGroup) {
379          realGroup.incrAllCounters(rightGroup);
380        }
381        
382        @Override
383        public CounterGroupBase<Counter> getUnderlyingGroup() {
384          return realGroup;
385        }
386    
387        @Override
388        public synchronized boolean equals(Object genericRight) {
389          if (genericRight instanceof CounterGroupBase<?>) {
390            @SuppressWarnings("unchecked")
391            CounterGroupBase<Counter> right = ((CounterGroupBase<Counter>) 
392            genericRight).getUnderlyingGroup();
393            return Iterators.elementsEqual(iterator(), right.iterator());
394          }
395          return false;
396        }
397    
398        @Override
399        public int hashCode() {
400          return realGroup.hashCode();
401        }
402      }
403    
404      // All the group impls need this for legacy group interface
405      static long getCounterValue(CounterGroupBase<Counter> group, String counterName) {
406        Counter counter = group.findCounter(counterName, false);
407        if (counter != null) return counter.getValue();
408        return 0L;
409      }
410    
411      // Mix the generic group implementation into the Group interface
412      private static class GenericGroup extends AbstractCounterGroup<Counter> {
413    
414        GenericGroup(String name, String displayName, Limits limits) {
415          super(name, displayName, limits);
416        }
417    
418        @Override
419        protected Counter newCounter(String counterName, String displayName,
420                                     long value) {
421          return new Counter(new GenericCounter(counterName, displayName, value));
422        }
423    
424        @Override
425        protected Counter newCounter() {
426          return new Counter();
427        }
428        
429        @Override
430        public CounterGroupBase<Counter> getUnderlyingGroup() {
431         return this;
432        }
433      }
434    
435      // Mix the framework group implementation into the Group interface
436      private static class FrameworkGroupImpl<T extends Enum<T>>
437          extends FrameworkCounterGroup<T, Counter> {
438    
439        FrameworkGroupImpl(Class<T> cls) {
440          super(cls);
441        }
442    
443        @Override
444        protected Counter newCounter(T key) {
445          return new Counter(new FrameworkCounter<T>(key, getName()));
446        }
447    
448        @Override
449        public CounterGroupBase<Counter> getUnderlyingGroup() {
450          return this;
451        }
452      }
453    
454      // Mix the file system counter group implementation into the Group interface
455      private static class FSGroupImpl extends FileSystemCounterGroup<Counter> {
456    
457        @Override
458        protected Counter newCounter(String scheme, FileSystemCounter key) {
459          return new Counter(new FSCounter(scheme, key));
460        }
461    
462        @Override
463        public CounterGroupBase<Counter> getUnderlyingGroup() {
464          return this;
465        }
466      }
467    
468      public synchronized Counter findCounter(String group, String name) {
469        if (name.equals("MAP_INPUT_BYTES")) {
470          LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
471                   "Use FileInputFormatCounters as group name and " +
472                   " BYTES_READ as counter name instead");
473          return findCounter(FileInputFormatCounter.BYTES_READ);
474        }
475        String newGroupKey = getNewGroupKey(group);
476        if (newGroupKey != null) {
477          group = newGroupKey;
478        }
479        return getGroup(group).getCounterForName(name);
480      }
481    
482      /**
483       * Provide factory methods for counter group factory implementation.
484       * See also the GroupFactory in
485       *  {@link org.apache.hadoop.mapreduce.Counters mapreduce.Counters}
486       */
487      static class GroupFactory extends CounterGroupFactory<Counter, Group> {
488    
489        @Override
490        protected <T extends Enum<T>>
491        FrameworkGroupFactory<Group> newFrameworkGroupFactory(final Class<T> cls) {
492          return new FrameworkGroupFactory<Group>() {
493            @Override public Group newGroup(String name) {
494              return new Group(new FrameworkGroupImpl<T>(cls)); // impl in this package
495            }
496          };
497        }
498    
499        @Override
500        protected Group newGenericGroup(String name, String displayName,
501                                        Limits limits) {
502          return new Group(new GenericGroup(name, displayName, limits));
503        }
504    
505        @Override
506        protected Group newFileSystemGroup() {
507          return new Group(new FSGroupImpl());
508        }
509      }
510    
511      private static final GroupFactory groupFactory = new GroupFactory();
512    
513      /**
514       * Find a counter by using strings
515       * @param group the name of the group
516       * @param id the id of the counter within the group (0 to N-1)
517       * @param name the internal name of the counter
518       * @return the counter for that name
519       * @deprecated use {@link #findCounter(String, String)} instead
520       */
521      @Deprecated
522      public Counter findCounter(String group, int id, String name) {
523        return findCounter(group, name);
524      }
525    
526      /**
527       * Increments the specified counter by the specified amount, creating it if
528       * it didn't already exist.
529       * @param key identifies a counter
530       * @param amount amount by which counter is to be incremented
531       */
532      public void incrCounter(Enum<?> key, long amount) {
533        findCounter(key).increment(amount);
534      }
535    
536      /**
537       * Increments the specified counter by the specified amount, creating it if
538       * it didn't already exist.
539       * @param group the name of the group
540       * @param counter the internal name of the counter
541       * @param amount amount by which counter is to be incremented
542       */
543      public void incrCounter(String group, String counter, long amount) {
544        findCounter(group, counter).increment(amount);
545      }
546    
547      /**
548       * Returns current value of the specified counter, or 0 if the counter
549       * does not exist.
550       * @param key the counter enum to lookup
551       * @return the counter value or 0 if counter not found
552       */
553      public synchronized long getCounter(Enum<?> key) {
554        return findCounter(key).getValue();
555      }
556    
557      /**
558       * Increments multiple counters by their amounts in another Counters
559       * instance.
560       * @param other the other Counters instance
561       */
562      public synchronized void incrAllCounters(Counters other) {
563        for (Group otherGroup: other) {
564          Group group = getGroup(otherGroup.getName());
565          group.setDisplayName(otherGroup.getDisplayName());
566          for (Counter otherCounter : otherGroup) {
567            Counter counter = group.getCounterForName(otherCounter.getName());
568            counter.setDisplayName(otherCounter.getDisplayName());
569            counter.increment(otherCounter.getValue());
570          }
571        }
572      }
573    
574      /**
575       * @return the total number of counters
576       * @deprecated use {@link #countCounters()} instead
577       */
578      public int size() {
579        return countCounters();
580      }
581    
582      /**
583       * Convenience method for computing the sum of two sets of counters.
584       * @param a the first counters
585       * @param b the second counters
586       * @return a new summed counters object
587       */
588      public static Counters sum(Counters a, Counters b) {
589        Counters counters = new Counters();
590        counters.incrAllCounters(a);
591        counters.incrAllCounters(b);
592        return counters;
593      }
594    
595      /**
596       * Logs the current counter values.
597       * @param log The log to use.
598       */
599      public void log(Log log) {
600        log.info("Counters: " + size());
601        for(Group group: this) {
602          log.info("  " + group.getDisplayName());
603          for (Counter counter: group) {
604            log.info("    " + counter.getDisplayName() + "=" +
605                     counter.getCounter());
606          }
607        }
608      }
609    
610      /**
611       * Represent the counter in a textual format that can be converted back to
612       * its object form
613       * @return the string in the following format
614       * {(groupName)(group-displayName)[(counterName)(displayName)(value)][]*}*
615       */
616      public String makeEscapedCompactString() {
617        return toEscapedCompactString(this);
618      }
619    
620      /**
621       * Convert a stringified (by {@link #makeEscapedCompactString()} counter
622       * representation into a counter object.
623       * @param compactString to parse
624       * @return a new counters object
625       * @throws ParseException
626       */
627      public static Counters fromEscapedCompactString(String compactString)
628          throws ParseException {
629        return parseEscapedCompactString(compactString, new Counters());
630      }
631    
632      /**
633       * Counter exception thrown when the number of counters exceed the limit
634       */
635      public static class CountersExceededException extends RuntimeException {
636    
637        private static final long serialVersionUID = 1L;
638    
639        public CountersExceededException(String msg) {
640          super(msg);
641        }
642    
643        // Only allows chaining of related exceptions
644        public CountersExceededException(CountersExceededException cause) {
645          super(cause);
646        }
647      }
648    }