001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.TreeMap;
024    import java.util.Map.Entry;
025    import java.util.Arrays;
026    
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    
030    
031    /**
032     * This class implements a value aggregator that computes the 
033     * histogram of a sequence of strings.
034     * 
035     */
036    @InterfaceAudience.Public
037    @InterfaceStability.Stable
038    public class ValueHistogram implements ValueAggregator<String> {
039    
040      TreeMap<Object, Object> items = null;
041    
042      public ValueHistogram() {
043        items = new TreeMap<Object, Object>();
044      }
045    
046      /**
047       * add the given val to the aggregator.
048       * 
049       * @param val the value to be added. It is expected to be a string
050       * in the form of xxxx\tnum, meaning xxxx has num occurrences.
051       */
052      public void addNextValue(Object val) {
053        String valCountStr = val.toString();
054        int pos = valCountStr.lastIndexOf("\t");
055        String valStr = valCountStr;
056        String countStr = "1";
057        if (pos >= 0) {
058          valStr = valCountStr.substring(0, pos);
059          countStr = valCountStr.substring(pos + 1);
060        }
061        
062        Long count = (Long) this.items.get(valStr);
063        long inc = Long.parseLong(countStr);
064    
065        if (count == null) {
066          count = inc;
067        } else {
068          count = count.longValue() + inc;
069        }
070        items.put(valStr, count);
071      }
072    
073      /**
074       * @return the string representation of this aggregator.
075       * It includes the following basic statistics of the histogram:
076       *    the number of unique values
077       *    the minimum value
078       *    the media value
079       *    the maximum value
080       *    the average value
081       *    the standard deviation
082       */
083      public String getReport() {
084        long[] counts = new long[items.size()];
085    
086        StringBuffer sb = new StringBuffer();
087        Iterator<Object> iter = items.values().iterator();
088        int i = 0;
089        while (iter.hasNext()) {
090          Long count = (Long) iter.next();
091          counts[i] = count.longValue();
092          i += 1;
093        }
094        Arrays.sort(counts);
095        sb.append(counts.length);
096        i = 0;
097        long acc = 0;
098        while (i < counts.length) {
099          long nextVal = counts[i];
100          int j = i + 1;
101          while (j < counts.length && counts[j] == nextVal) {
102            j++;
103          }
104          acc += nextVal * (j - i);
105          i = j;
106        }
107        double average = 0.0;
108        double sd = 0.0;
109        if (counts.length > 0) {
110          sb.append("\t").append(counts[0]);
111          sb.append("\t").append(counts[counts.length / 2]);
112          sb.append("\t").append(counts[counts.length - 1]);
113    
114          average = acc * 1.0 / counts.length;
115          sb.append("\t").append(average);
116    
117          i = 0;
118          while (i < counts.length) {
119            double nextDiff = counts[i] - average;
120            sd += nextDiff * nextDiff;
121            i += 1;
122          }
123          sd = Math.sqrt(sd / counts.length);
124          sb.append("\t").append(sd);
125    
126        }
127        return sb.toString();
128      }
129    
130      /** 
131       * 
132       * @return a string representation of the list of value/frequence pairs of 
133       * the histogram
134       */
135      public String getReportDetails() {
136        StringBuffer sb = new StringBuffer();
137        Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
138        while (iter.hasNext()) {
139          Entry<Object,Object> en = iter.next();
140          Object val = en.getKey();
141          Long count = (Long) en.getValue();
142          sb.append("\t").append(val.toString()).append("\t").
143             append(count.longValue()).append("\n");
144        }
145        return sb.toString();
146      }
147    
148      /**
149       *  @return a list value/frequence pairs.
150       *  The return value is expected to be used by the reducer.
151       */
152      public ArrayList<String> getCombinerOutput() {
153        ArrayList<String> retv = new ArrayList<String>();
154        Iterator<Entry<Object,Object>> iter = items.entrySet().iterator();
155    
156        while (iter.hasNext()) {
157          Entry<Object,Object> en =  iter.next();
158          Object val = en.getKey();
159          Long count = (Long) en.getValue();
160          retv.add(val.toString() + "\t" + count.longValue());
161        }
162        return retv;
163      }
164    
165      /** 
166       * 
167       * @return a TreeMap representation of the histogram
168       */
169      public TreeMap<Object,Object> getReportItems() {
170        return items;
171      }
172    
173      /** 
174       * reset the aggregator
175       */
176      public void reset() {
177        items = new TreeMap<Object, Object>();
178      }
179    
180    }