001    /**
002     *
003     * Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
004     * All rights reserved.
005     * Redistribution and use in source and binary forms, with or 
006     * without modification, are permitted provided that the following 
007     * conditions are met:
008     *  - Redistributions of source code must retain the above copyright 
009     *    notice, this list of conditions and the following disclaimer.
010     *  - Redistributions in binary form must reproduce the above copyright 
011     *    notice, this list of conditions and the following disclaimer in 
012     *    the documentation and/or other materials provided with the distribution.
013     *  - Neither the name of the University Catholique de Louvain - UCL
014     *    nor the names of its contributors may be used to endorse or 
015     *    promote products derived from this software without specific prior 
016     *    written permission.
017     *    
018     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
019     * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
020     * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
021     * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
022     * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
023     * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
024     * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
025     * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
026     * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
027     * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
028     * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
029     * POSSIBILITY OF SUCH DAMAGE.
030     */
031    
032    /**
033     * Licensed to the Apache Software Foundation (ASF) under one
034     * or more contributor license agreements.  See the NOTICE file
035     * distributed with this work for additional information
036     * regarding copyright ownership.  The ASF licenses this file
037     * to you under the Apache License, Version 2.0 (the
038     * "License"); you may not use this file except in compliance
039     * with the License.  You may obtain a copy of the License at
040     *
041     *     http://www.apache.org/licenses/LICENSE-2.0
042     *
043     * Unless required by applicable law or agreed to in writing, software
044     * distributed under the License is distributed on an "AS IS" BASIS,
045     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
046     * See the License for the specific language governing permissions and
047     * limitations under the License.
048     */
049    
050    package org.apache.hadoop.util.bloom;
051    
052    import java.io.DataInput;
053    import java.io.DataOutput;
054    import java.io.IOException;
055    
056    import org.apache.hadoop.classification.InterfaceAudience;
057    import org.apache.hadoop.classification.InterfaceStability;
058    
059    /**
060     * Implements a <i>counting Bloom filter</i>, as defined by Fan et al. in a ToN
061     * 2000 paper.
062     * <p>
063     * A counting Bloom filter is an improvement to standard a Bloom filter as it
064     * allows dynamic additions and deletions of set membership information.  This 
065     * is achieved through the use of a counting vector instead of a bit vector.
066     * <p>
067     * Originally created by
068     * <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
069     *
070     * @see Filter The general behavior of a filter
071     * 
072     * @see <a href="http://portal.acm.org/citation.cfm?id=343571.343572">Summary cache: a scalable wide-area web cache sharing protocol</a>
073     */
074    @InterfaceAudience.Public
075    @InterfaceStability.Stable
076    public final class CountingBloomFilter extends Filter {
077      /** Storage for the counting buckets */
078      private long[] buckets;
079    
080      /** We are using 4bit buckets, so each bucket can count to 15 */
081      private final static long BUCKET_MAX_VALUE = 15;
082    
083      /** Default constructor - use with readFields */
084      public CountingBloomFilter() {}
085      
086      /**
087       * Constructor
088       * @param vectorSize The vector size of <i>this</i> filter.
089       * @param nbHash The number of hash function to consider.
090       * @param hashType type of the hashing function (see
091       * {@link org.apache.hadoop.util.hash.Hash}).
092       */
093      public CountingBloomFilter(int vectorSize, int nbHash, int hashType) {
094        super(vectorSize, nbHash, hashType);
095        buckets = new long[buckets2words(vectorSize)];
096      }
097    
098      /** returns the number of 64 bit words it would take to hold vectorSize buckets */
099      private static int buckets2words(int vectorSize) {
100       return ((vectorSize - 1) >>> 4) + 1;
101      }
102    
103    
104      @Override
105      public void add(Key key) {
106        if(key == null) {
107          throw new NullPointerException("key can not be null");
108        }
109    
110        int[] h = hash.hash(key);
111        hash.clear();
112    
113        for(int i = 0; i < nbHash; i++) {
114          // find the bucket
115          int wordNum = h[i] >> 4;          // div 16
116          int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
117          
118          long bucketMask = 15L << bucketShift;
119          long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
120          
121          // only increment if the count in the bucket is less than BUCKET_MAX_VALUE
122          if(bucketValue < BUCKET_MAX_VALUE) {
123            // increment by 1
124            buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue + 1) << bucketShift);
125          }
126        }
127      }
128    
129      /**
130       * Removes a specified key from <i>this</i> counting Bloom filter.
131       * <p>
132       * <b>Invariant</b>: nothing happens if the specified key does not belong to <i>this</i> counter Bloom filter.
133       * @param key The key to remove.
134       */
135      public void delete(Key key) {
136        if(key == null) {
137          throw new NullPointerException("Key may not be null");
138        }
139        if(!membershipTest(key)) {
140          throw new IllegalArgumentException("Key is not a member");
141        }
142    
143        int[] h = hash.hash(key);
144        hash.clear();
145    
146        for(int i = 0; i < nbHash; i++) {
147          // find the bucket
148          int wordNum = h[i] >> 4;          // div 16
149          int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
150          
151          long bucketMask = 15L << bucketShift;
152          long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
153          
154          // only decrement if the count in the bucket is between 0 and BUCKET_MAX_VALUE
155          if(bucketValue >= 1 && bucketValue < BUCKET_MAX_VALUE) {
156            // decrement by 1
157            buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue - 1) << bucketShift);
158          }
159        }
160      }
161    
162      @Override
163      public void and(Filter filter) {
164        if(filter == null
165            || !(filter instanceof CountingBloomFilter)
166            || filter.vectorSize != this.vectorSize
167            || filter.nbHash != this.nbHash) {
168          throw new IllegalArgumentException("filters cannot be and-ed");
169        }
170        CountingBloomFilter cbf = (CountingBloomFilter)filter;
171        
172        int sizeInWords = buckets2words(vectorSize);
173        for(int i = 0; i < sizeInWords; i++) {
174          this.buckets[i] &= cbf.buckets[i];
175        }
176      }
177    
178      @Override
179      public boolean membershipTest(Key key) {
180        if(key == null) {
181          throw new NullPointerException("Key may not be null");
182        }
183    
184        int[] h = hash.hash(key);
185        hash.clear();
186    
187        for(int i = 0; i < nbHash; i++) {
188          // find the bucket
189          int wordNum = h[i] >> 4;          // div 16
190          int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
191    
192          long bucketMask = 15L << bucketShift;
193    
194          if((buckets[wordNum] & bucketMask) == 0) {
195            return false;
196          }
197        }
198    
199        return true;
200      }
201    
202      /**
203       * This method calculates an approximate count of the key, i.e. how many
204       * times the key was added to the filter. This allows the filter to be
205       * used as an approximate <code>key -&gt; count</code> map.
206       * <p>NOTE: due to the bucket size of this filter, inserting the same
207       * key more than 15 times will cause an overflow at all filter positions
208       * associated with this key, and it will significantly increase the error
209       * rate for this and other keys. For this reason the filter can only be
210       * used to store small count values <code>0 &lt;= N &lt;&lt; 15</code>.
211       * @param key key to be tested
212       * @return 0 if the key is not present. Otherwise, a positive value v will
213       * be returned such that <code>v == count</code> with probability equal to the
214       * error rate of this filter, and <code>v &gt; count</code> otherwise.
215       * Additionally, if the filter experienced an underflow as a result of
216       * {@link #delete(Key)} operation, the return value may be lower than the
217       * <code>count</code> with the probability of the false negative rate of such
218       * filter.
219       */
220      public int approximateCount(Key key) {
221        int res = Integer.MAX_VALUE;
222        int[] h = hash.hash(key);
223        hash.clear();
224        for (int i = 0; i < nbHash; i++) {
225          // find the bucket
226          int wordNum = h[i] >> 4;          // div 16
227          int bucketShift = (h[i] & 0x0f) << 2;  // (mod 16) * 4
228          
229          long bucketMask = 15L << bucketShift;
230          long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
231          if (bucketValue < res) res = (int)bucketValue;
232        }
233        if (res != Integer.MAX_VALUE) {
234          return res;
235        } else {
236          return 0;
237        }
238      }
239    
240      @Override
241      public void not() {
242        throw new UnsupportedOperationException("not() is undefined for "
243            + this.getClass().getName());
244      }
245    
246      @Override
247      public void or(Filter filter) {
248        if(filter == null
249            || !(filter instanceof CountingBloomFilter)
250            || filter.vectorSize != this.vectorSize
251            || filter.nbHash != this.nbHash) {
252          throw new IllegalArgumentException("filters cannot be or-ed");
253        }
254    
255        CountingBloomFilter cbf = (CountingBloomFilter)filter;
256    
257        int sizeInWords = buckets2words(vectorSize);
258        for(int i = 0; i < sizeInWords; i++) {
259          this.buckets[i] |= cbf.buckets[i];
260        }
261      }
262    
263      @Override
264      public void xor(Filter filter) {
265        throw new UnsupportedOperationException("xor() is undefined for "
266            + this.getClass().getName());
267      }
268    
269      @Override
270      public String toString() {
271        StringBuilder res = new StringBuilder();
272    
273        for(int i = 0; i < vectorSize; i++) {
274          if(i > 0) {
275            res.append(" ");
276          }
277          
278          int wordNum = i >> 4;          // div 16
279          int bucketShift = (i & 0x0f) << 2;  // (mod 16) * 4
280          
281          long bucketMask = 15L << bucketShift;
282          long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
283          
284          res.append(bucketValue);
285        }
286    
287        return res.toString();
288      }
289    
290      // Writable
291    
292      @Override
293      public void write(DataOutput out) throws IOException {
294        super.write(out);
295        int sizeInWords = buckets2words(vectorSize);
296        for(int i = 0; i < sizeInWords; i++) {
297          out.writeLong(buckets[i]);
298        }
299      }
300    
301      @Override
302      public void readFields(DataInput in) throws IOException {
303        super.readFields(in);
304        int sizeInWords = buckets2words(vectorSize);
305        buckets = new long[sizeInWords];
306        for(int i = 0; i < sizeInWords; i++) {
307          buckets[i] = in.readLong();
308        }
309      }
310    }