001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.aggregate; 020 021 import java.util.ArrayList; 022 import java.util.Iterator; 023 import java.util.TreeMap; 024 import java.util.Map.Entry; 025 import java.util.Arrays; 026 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 030 031 /** 032 * This class implements a value aggregator that computes the 033 * histogram of a sequence of strings. 034 * 035 */ 036 @InterfaceAudience.Public 037 @InterfaceStability.Stable 038 public class ValueHistogram implements ValueAggregator<String> { 039 040 TreeMap<Object, Object> items = null; 041 042 public ValueHistogram() { 043 items = new TreeMap<Object, Object>(); 044 } 045 046 /** 047 * add the given val to the aggregator. 048 * 049 * @param val the value to be added. It is expected to be a string 050 * in the form of xxxx\tnum, meaning xxxx has num occurrences. 051 */ 052 public void addNextValue(Object val) { 053 String valCountStr = val.toString(); 054 int pos = valCountStr.lastIndexOf("\t"); 055 String valStr = valCountStr; 056 String countStr = "1"; 057 if (pos >= 0) { 058 valStr = valCountStr.substring(0, pos); 059 countStr = valCountStr.substring(pos + 1); 060 } 061 062 Long count = (Long) this.items.get(valStr); 063 long inc = Long.parseLong(countStr); 064 065 if (count == null) { 066 count = inc; 067 } else { 068 count = count.longValue() + inc; 069 } 070 items.put(valStr, count); 071 } 072 073 /** 074 * @return the string representation of this aggregator. 075 * It includes the following basic statistics of the histogram: 076 * the number of unique values 077 * the minimum value 078 * the media value 079 * the maximum value 080 * the average value 081 * the standard deviation 082 */ 083 public String getReport() { 084 long[] counts = new long[items.size()]; 085 086 StringBuffer sb = new StringBuffer(); 087 Iterator<Object> iter = items.values().iterator(); 088 int i = 0; 089 while (iter.hasNext()) { 090 Long count = (Long) iter.next(); 091 counts[i] = count.longValue(); 092 i += 1; 093 } 094 Arrays.sort(counts); 095 sb.append(counts.length); 096 i = 0; 097 long acc = 0; 098 while (i < counts.length) { 099 long nextVal = counts[i]; 100 int j = i + 1; 101 while (j < counts.length && counts[j] == nextVal) { 102 j++; 103 } 104 acc += nextVal * (j - i); 105 i = j; 106 } 107 double average = 0.0; 108 double sd = 0.0; 109 if (counts.length > 0) { 110 sb.append("\t").append(counts[0]); 111 sb.append("\t").append(counts[counts.length / 2]); 112 sb.append("\t").append(counts[counts.length - 1]); 113 114 average = acc * 1.0 / counts.length; 115 sb.append("\t").append(average); 116 117 i = 0; 118 while (i < counts.length) { 119 double nextDiff = counts[i] - average; 120 sd += nextDiff * nextDiff; 121 i += 1; 122 } 123 sd = Math.sqrt(sd / counts.length); 124 sb.append("\t").append(sd); 125 126 } 127 return sb.toString(); 128 } 129 130 /** 131 * 132 * @return a string representation of the list of value/frequence pairs of 133 * the histogram 134 */ 135 public String getReportDetails() { 136 StringBuffer sb = new StringBuffer(); 137 Iterator<Entry<Object,Object>> iter = items.entrySet().iterator(); 138 while (iter.hasNext()) { 139 Entry<Object,Object> en = iter.next(); 140 Object val = en.getKey(); 141 Long count = (Long) en.getValue(); 142 sb.append("\t").append(val.toString()).append("\t"). 143 append(count.longValue()).append("\n"); 144 } 145 return sb.toString(); 146 } 147 148 /** 149 * @return a list value/frequence pairs. 150 * The return value is expected to be used by the reducer. 151 */ 152 public ArrayList<String> getCombinerOutput() { 153 ArrayList<String> retv = new ArrayList<String>(); 154 Iterator<Entry<Object,Object>> iter = items.entrySet().iterator(); 155 156 while (iter.hasNext()) { 157 Entry<Object,Object> en = iter.next(); 158 Object val = en.getKey(); 159 Long count = (Long) en.getValue(); 160 retv.add(val.toString() + "\t" + count.longValue()); 161 } 162 return retv; 163 } 164 165 /** 166 * 167 * @return a TreeMap representation of the histogram 168 */ 169 public TreeMap<Object,Object> getReportItems() { 170 return items; 171 } 172 173 /** 174 * reset the aggregator 175 */ 176 public void reset() { 177 items = new TreeMap<Object, Object>(); 178 } 179 180 }