001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.SequenceFile.CompressionType;
033import org.apache.hadoop.io.compress.CompressionCodec;
034import org.apache.hadoop.util.Progressable;
035import org.apache.hadoop.util.bloom.DynamicBloomFilter;
036import org.apache.hadoop.util.bloom.Filter;
037import org.apache.hadoop.util.bloom.Key;
038import org.apache.hadoop.util.hash.Hash;
039
040/**
041 * This class extends {@link MapFile} and provides very much the same
042 * functionality. However, it uses dynamic Bloom filters to provide
043 * quick membership test for keys, and it offers a fast version of 
044 * {@link Reader#get(WritableComparable, Writable)} operation, especially in
045 * case of sparsely populated MapFile-s.
046 */
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public class BloomMapFile {
050  private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
051  public static final String BLOOM_FILE_NAME = "bloom";
052  public static final int HASH_COUNT = 5;
053  
054  public static void delete(FileSystem fs, String name) throws IOException {
055    Path dir = new Path(name);
056    Path data = new Path(dir, MapFile.DATA_FILE_NAME);
057    Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
058    Path bloom = new Path(dir, BLOOM_FILE_NAME);
059
060    fs.delete(data, true);
061    fs.delete(index, true);
062    fs.delete(bloom, true);
063    fs.delete(dir, true);
064  }
065
066  private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) {
067    int cleanLength = buf.getLength();
068    byte [] ba = buf.getData();
069    if (cleanLength != ba.length) {
070      ba = new byte[cleanLength];
071      System.arraycopy(buf.getData(), 0, ba, 0, cleanLength);
072    }
073    return ba;
074  }
075  
076  public static class Writer extends MapFile.Writer {
077    private DynamicBloomFilter bloomFilter;
078    private int numKeys;
079    private int vectorSize;
080    private Key bloomKey = new Key();
081    private DataOutputBuffer buf = new DataOutputBuffer();
082    private FileSystem fs;
083    private Path dir;
084    
085    @Deprecated
086    public Writer(Configuration conf, FileSystem fs, String dirName,
087        Class<? extends WritableComparable> keyClass,
088        Class<? extends Writable> valClass, CompressionType compress,
089        CompressionCodec codec, Progressable progress) throws IOException {
090      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
091           compression(compress, codec), progressable(progress));
092    }
093
094    @Deprecated
095    public Writer(Configuration conf, FileSystem fs, String dirName,
096        Class<? extends WritableComparable> keyClass,
097        Class valClass, CompressionType compress,
098        Progressable progress) throws IOException {
099      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
100           compression(compress), progressable(progress));
101    }
102
103    @Deprecated
104    public Writer(Configuration conf, FileSystem fs, String dirName,
105        Class<? extends WritableComparable> keyClass,
106        Class valClass, CompressionType compress)
107        throws IOException {
108      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
109           compression(compress));
110    }
111
112    @Deprecated
113    public Writer(Configuration conf, FileSystem fs, String dirName,
114        WritableComparator comparator, Class valClass,
115        CompressionType compress, CompressionCodec codec, Progressable progress)
116        throws IOException {
117      this(conf, new Path(dirName), comparator(comparator), 
118           valueClass(valClass), compression(compress, codec), 
119           progressable(progress));
120    }
121
122    @Deprecated
123    public Writer(Configuration conf, FileSystem fs, String dirName,
124        WritableComparator comparator, Class valClass,
125        CompressionType compress, Progressable progress) throws IOException {
126      this(conf, new Path(dirName), comparator(comparator), 
127           valueClass(valClass), compression(compress),
128           progressable(progress));
129    }
130
131    @Deprecated
132    public Writer(Configuration conf, FileSystem fs, String dirName,
133        WritableComparator comparator, Class valClass, CompressionType compress)
134        throws IOException {
135      this(conf, new Path(dirName), comparator(comparator), 
136           valueClass(valClass), compression(compress));
137    }
138
139    @Deprecated
140    public Writer(Configuration conf, FileSystem fs, String dirName,
141        WritableComparator comparator, Class valClass) throws IOException {
142      this(conf, new Path(dirName), comparator(comparator), 
143           valueClass(valClass));
144    }
145
146    @Deprecated
147    public Writer(Configuration conf, FileSystem fs, String dirName,
148                  Class<? extends WritableComparable> keyClass,
149                  Class valClass) throws IOException {
150      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
151    }
152
153    public Writer(Configuration conf, Path dir, 
154                  SequenceFile.Writer.Option... options) throws IOException {
155      super(conf, dir, options);
156      this.fs = dir.getFileSystem(conf);
157      this.dir = dir;
158      initBloomFilter(conf);
159    }
160
161    private synchronized void initBloomFilter(Configuration conf) {
162      numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
163      // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
164      // single key, where <code> is the number of hash functions,
165      // <code>n</code> is the number of keys and <code>c</code> is the desired
166      // max. error rate.
167      // Our desired error rate is by default 0.005, i.e. 0.5%
168      float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
169      vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
170          Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
171      bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
172          Hash.getHashType(conf), numKeys);
173    }
174
175    @Override
176    public synchronized void append(WritableComparable key, Writable val)
177        throws IOException {
178      super.append(key, val);
179      buf.reset();
180      key.write(buf);
181      bloomKey.set(byteArrayForBloomKey(buf), 1.0);
182      bloomFilter.add(bloomKey);
183    }
184
185    @Override
186    public synchronized void close() throws IOException {
187      super.close();
188      DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
189      try {
190        bloomFilter.write(out);
191        out.flush();
192        out.close();
193        out = null;
194      } finally {
195        IOUtils.closeStream(out);
196      }
197    }
198
199  }
200  
201  public static class Reader extends MapFile.Reader {
202    private DynamicBloomFilter bloomFilter;
203    private DataOutputBuffer buf = new DataOutputBuffer();
204    private Key bloomKey = new Key();
205
206    public Reader(Path dir, Configuration conf,
207                  SequenceFile.Reader.Option... options) throws IOException {
208      super(dir, conf, options);
209      initBloomFilter(dir, conf);
210    }
211
212    @Deprecated
213    public Reader(FileSystem fs, String dirName, Configuration conf)
214        throws IOException {
215      this(new Path(dirName), conf);
216    }
217
218    @Deprecated
219    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
220        Configuration conf, boolean open) throws IOException {
221      this(new Path(dirName), conf, comparator(comparator));
222    }
223
224    @Deprecated
225    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
226        Configuration conf) throws IOException {
227      this(new Path(dirName), conf, comparator(comparator));
228    }
229    
230    private void initBloomFilter(Path dirName, 
231                                 Configuration conf) {
232      
233      DataInputStream in = null;
234      try {
235        FileSystem fs = dirName.getFileSystem(conf);
236        in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
237        bloomFilter = new DynamicBloomFilter();
238        bloomFilter.readFields(in);
239        in.close();
240        in = null;
241      } catch (IOException ioe) {
242        LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
243        bloomFilter = null;
244      } finally {
245        IOUtils.closeStream(in);
246      }
247    }
248    
249    /**
250     * Checks if this MapFile has the indicated key. The membership test is
251     * performed using a Bloom filter, so the result has always non-zero
252     * probability of false positives.
253     * @param key key to check
254     * @return  false iff key doesn't exist, true if key probably exists.
255     * @throws IOException
256     */
257    public boolean probablyHasKey(WritableComparable key) throws IOException {
258      if (bloomFilter == null) {
259        return true;
260      }
261      buf.reset();
262      key.write(buf);
263      bloomKey.set(byteArrayForBloomKey(buf), 1.0);
264      return bloomFilter.membershipTest(bloomKey);
265    }
266    
267    /**
268     * Fast version of the
269     * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
270     * it checks the Bloom filter for the existence of the key, and only if
271     * present it performs the real get operation. This yields significant
272     * performance improvements for get operations on sparsely populated files.
273     */
274    @Override
275    public synchronized Writable get(WritableComparable key, Writable val)
276        throws IOException {
277      if (!probablyHasKey(key)) {
278        return null;
279      }
280      return super.get(key, val);
281    }
282    
283    /**
284     * Retrieve the Bloom filter used by this instance of the Reader.
285     * @return a Bloom filter (see {@link Filter})
286     */
287    public Filter getBloomFilter() {
288      return bloomFilter;
289    }
290  }
291}