001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.aggregate;
020    
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.Set;
024    import java.util.TreeMap;
025    
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    
029    /**
030     * This class implements a value aggregator that dedupes a sequence of objects.
031     * 
032     */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class UniqValueCount implements ValueAggregator<Object> {
036      public static final String MAX_NUM_UNIQUE_VALUES = 
037        "mapreduce.aggregate.max.num.unique.values";
038    
039      private TreeMap<Object, Object> uniqItems = null;
040    
041      private long numItems = 0;
042      
043      private long maxNumItems = Long.MAX_VALUE;
044    
045      /**
046       * the default constructor
047       * 
048       */
049      public UniqValueCount() {
050        this(Long.MAX_VALUE);
051      }
052      
053      /**
054       * constructor
055       * @param maxNum the limit in the number of unique values to keep.
056       *  
057       */
058      public UniqValueCount(long maxNum) {
059        uniqItems = new TreeMap<Object, Object>();
060        this.numItems = 0;
061        maxNumItems = Long.MAX_VALUE;
062        if (maxNum > 0 ) {
063          this.maxNumItems = maxNum;
064        }
065      }
066    
067      /**
068       * Set the limit on the number of unique values
069       * @param n the desired limit on the number of unique values
070       * @return the new limit on the number of unique values
071       */
072      public long setMaxItems(long n) {
073        if (n >= numItems) {
074          this.maxNumItems = n;
075        } else if (this.maxNumItems >= this.numItems) {
076          this.maxNumItems = this.numItems;
077        }
078        return this.maxNumItems;
079      }
080      
081      /**
082       * add a value to the aggregator
083       * 
084       * @param val
085       *          an object.
086       * 
087       */
088      public void addNextValue(Object val) {
089        if (this.numItems <= this.maxNumItems) {
090          uniqItems.put(val.toString(), "1");
091          this.numItems = this.uniqItems.size();
092        }
093      }
094    
095      /**
096       * @return return the number of unique objects aggregated
097       */
098      public String getReport() {
099        return "" + uniqItems.size();
100      }
101    
102      /**
103       * 
104       * @return the set of the unique objects
105       */
106      public Set<Object> getUniqueItems() {
107        return uniqItems.keySet();
108      }
109    
110      /**
111       * reset the aggregator
112       */
113      public void reset() {
114        uniqItems = new TreeMap<Object, Object>();
115      }
116    
117      /**
118       * @return return an array of the unique objects. The return value is
119       *         expected to be used by the a combiner.
120       */
121      public ArrayList<Object> getCombinerOutput() {
122        Object key = null;
123        Iterator<Object> iter = uniqItems.keySet().iterator();
124        ArrayList<Object> retv = new ArrayList<Object>();
125    
126        while (iter.hasNext()) {
127          key = iter.next();
128          retv.add(key);
129        }
130        return retv;
131      }
132    }