001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.aggregate;
020
021import java.util.ArrayList;
022import java.util.Iterator;
023import java.util.TreeMap;
024import java.util.Map.Entry;
025import java.util.Arrays;
026
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029
030
031/**
032 * This class implements a value aggregator that computes the 
033 * histogram of a sequence of strings.
034 * 
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Stable
038public class ValueHistogram implements ValueAggregator<String> {
039
040  TreeMap<Object, Object> items = null;
041
042  public ValueHistogram() {
043    items = new TreeMap<Object, Object>();
044  }
045
046  /**
047   * add the given val to the aggregator.
048   * 
049   * @param val the value to be added. It is expected to be a string
050   * in the form of xxxx\tnum, meaning xxxx has num occurrences.
051   */
052  public void addNextValue(Object val) {
053    String valCountStr = val.toString();
054    int pos = valCountStr.lastIndexOf("\t");
055    String valStr = valCountStr;
056    String countStr = "1";
057    if (pos >= 0) {
058      valStr = valCountStr.substring(0, pos);
059      countStr = valCountStr.substring(pos + 1);
060    }
061    
062    Long count = (Long) this.items.get(valStr);
063    long inc = Long.parseLong(countStr);
064
065    if (count == null) {
066      count = inc;
067    } else {
068      count = count.longValue() + inc;
069    }
070    items.put(valStr, count);
071  }
072
073  /**
074   * @return the string representation of this aggregator.
075   * It includes the following basic statistics of the histogram:
076   *    the number of unique values
077   *    the minimum value
078   *    the media value
079   *    the maximum value
080   *    the average value
081   *    the standard deviation
082   */
083  public String getReport() {
084    long[] counts = new long[items.size()];
085
086    StringBuffer sb = new StringBuffer();
087    Iterator<Object> iter = items.values().iterator();
088    int i = 0;
089    while (iter.hasNext()) {
090      Long count = (Long) iter.next();
091      counts[i] = count.longValue();
092      i += 1;
093    }
094    Arrays.sort(counts);
095    sb.append(counts.length);
096    i = 0;
097    long acc = 0;
098    while (i < counts.length) {
099      long nextVal = counts[i];
100      int j = i + 1;
101      while (j < counts.length && counts[j] == nextVal) {
102        j++;
103      }
104      acc += nextVal * (j - i);
105      i = j;
106    }
107    double average = 0.0;
108    double sd = 0.0;
109    if (counts.length > 0) {
110      sb.append("\t").append(counts[0]);
111      sb.append("\t").append(counts[counts.length / 2]);
112      sb.append("\t").append(counts[counts.length - 1]);
113
114      average = acc * 1.0 / counts.length;
115      sb.append("\t").append(average);
116
117      i = 0;
118      while (i < counts.length) {
119        double nextDiff = counts[i] - average;
120        sd += nextDiff * nextDiff;
121        i += 1;
122      }
123      sd = Math.sqrt(sd / counts.length);
124      sb.append("\t").append(sd);
125
126    }
127    return sb.toString();
128  }
129
130  /** 
131   * 
132   * @return a string representation of the list of value/frequence pairs of 
133   * the histogram
134   */
135  public String getReportDetails() {
136    StringBuffer sb = new StringBuffer();
137    Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
138    while (iter.hasNext()) {
139      Entry<Object,Object> en = iter.next();
140      Object val = en.getKey();
141      Long count = (Long) en.getValue();
142      sb.append("\t").append(val.toString()).append("\t").
143         append(count.longValue()).append("\n");
144    }
145    return sb.toString();
146  }
147
148  /**
149   *  @return a list value/frequence pairs.
150   *  The return value is expected to be used by the reducer.
151   */
152  public ArrayList<String> getCombinerOutput() {
153    ArrayList<String> retv = new ArrayList<String>();
154    Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
155
156    while (iter.hasNext()) {
157      Entry<Object,Object> en =  iter.next();
158      Object val = en.getKey();
159      Long count = (Long) en.getValue();
160      retv.add(val.toString() + "\t" + count.longValue());
161    }
162    return retv;
163  }
164
165  /** 
166   * 
167   * @return a TreeMap representation of the histogram
168   */
169  public TreeMap<Object,Object> getReportItems() {
170    return items;
171  }
172
173  /** 
174   * reset the aggregator
175   */
176  public void reset() {
177    items = new TreeMap<Object, Object>();
178  }
179
180}