001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.io.SequenceFile.CompressionType;
033import org.apache.hadoop.io.compress.CompressionCodec;
034import org.apache.hadoop.util.Progressable;
035import org.apache.hadoop.util.bloom.DynamicBloomFilter;
036import org.apache.hadoop.util.bloom.Filter;
037import org.apache.hadoop.util.bloom.Key;
038import org.apache.hadoop.util.hash.Hash;
039
040import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT;
041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_KEY;
042import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_DEFAULT;
043import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_KEY;
044
045/**
046 * This class extends {@link MapFile} and provides very much the same
047 * functionality. However, it uses dynamic Bloom filters to provide
048 * quick membership test for keys, and it offers a fast version of 
049 * {@link Reader#get(WritableComparable, Writable)} operation, especially in
050 * case of sparsely populated MapFile-s.
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Stable
054public class BloomMapFile {
055  private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
056  public static final String BLOOM_FILE_NAME = "bloom";
057  public static final int HASH_COUNT = 5;
058  
059  public static void delete(FileSystem fs, String name) throws IOException {
060    Path dir = new Path(name);
061    Path data = new Path(dir, MapFile.DATA_FILE_NAME);
062    Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
063    Path bloom = new Path(dir, BLOOM_FILE_NAME);
064
065    fs.delete(data, true);
066    fs.delete(index, true);
067    fs.delete(bloom, true);
068    fs.delete(dir, true);
069  }
070
071  private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) {
072    int cleanLength = buf.getLength();
073    byte [] ba = buf.getData();
074    if (cleanLength != ba.length) {
075      ba = new byte[cleanLength];
076      System.arraycopy(buf.getData(), 0, ba, 0, cleanLength);
077    }
078    return ba;
079  }
080  
081  public static class Writer extends MapFile.Writer {
082    private DynamicBloomFilter bloomFilter;
083    private int numKeys;
084    private int vectorSize;
085    private Key bloomKey = new Key();
086    private DataOutputBuffer buf = new DataOutputBuffer();
087    private FileSystem fs;
088    private Path dir;
089    
090    @Deprecated
091    public Writer(Configuration conf, FileSystem fs, String dirName,
092        Class<? extends WritableComparable> keyClass,
093        Class<? extends Writable> valClass, CompressionType compress,
094        CompressionCodec codec, Progressable progress) throws IOException {
095      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
096           compression(compress, codec), progressable(progress));
097    }
098
099    @Deprecated
100    public Writer(Configuration conf, FileSystem fs, String dirName,
101        Class<? extends WritableComparable> keyClass,
102        Class valClass, CompressionType compress,
103        Progressable progress) throws IOException {
104      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
105           compression(compress), progressable(progress));
106    }
107
108    @Deprecated
109    public Writer(Configuration conf, FileSystem fs, String dirName,
110        Class<? extends WritableComparable> keyClass,
111        Class valClass, CompressionType compress)
112        throws IOException {
113      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
114           compression(compress));
115    }
116
117    @Deprecated
118    public Writer(Configuration conf, FileSystem fs, String dirName,
119        WritableComparator comparator, Class valClass,
120        CompressionType compress, CompressionCodec codec, Progressable progress)
121        throws IOException {
122      this(conf, new Path(dirName), comparator(comparator), 
123           valueClass(valClass), compression(compress, codec), 
124           progressable(progress));
125    }
126
127    @Deprecated
128    public Writer(Configuration conf, FileSystem fs, String dirName,
129        WritableComparator comparator, Class valClass,
130        CompressionType compress, Progressable progress) throws IOException {
131      this(conf, new Path(dirName), comparator(comparator), 
132           valueClass(valClass), compression(compress),
133           progressable(progress));
134    }
135
136    @Deprecated
137    public Writer(Configuration conf, FileSystem fs, String dirName,
138        WritableComparator comparator, Class valClass, CompressionType compress)
139        throws IOException {
140      this(conf, new Path(dirName), comparator(comparator), 
141           valueClass(valClass), compression(compress));
142    }
143
144    @Deprecated
145    public Writer(Configuration conf, FileSystem fs, String dirName,
146        WritableComparator comparator, Class valClass) throws IOException {
147      this(conf, new Path(dirName), comparator(comparator), 
148           valueClass(valClass));
149    }
150
151    @Deprecated
152    public Writer(Configuration conf, FileSystem fs, String dirName,
153                  Class<? extends WritableComparable> keyClass,
154                  Class valClass) throws IOException {
155      this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
156    }
157
158    public Writer(Configuration conf, Path dir, 
159                  SequenceFile.Writer.Option... options) throws IOException {
160      super(conf, dir, options);
161      this.fs = dir.getFileSystem(conf);
162      this.dir = dir;
163      initBloomFilter(conf);
164    }
165
166    private synchronized void initBloomFilter(Configuration conf) {
167      numKeys = conf.getInt(
168          IO_MAPFILE_BLOOM_SIZE_KEY, IO_MAPFILE_BLOOM_SIZE_DEFAULT);
169      // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
170      // single key, where <code> is the number of hash functions,
171      // <code>n</code> is the number of keys and <code>c</code> is the desired
172      // max. error rate.
173      // Our desired error rate is by default 0.005, i.e. 0.5%
174      float errorRate = conf.getFloat(
175          IO_MAPFILE_BLOOM_ERROR_RATE_KEY, IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT);
176      vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
177          Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
178      bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
179          Hash.getHashType(conf), numKeys);
180    }
181
182    @Override
183    public synchronized void append(WritableComparable key, Writable val)
184        throws IOException {
185      super.append(key, val);
186      buf.reset();
187      key.write(buf);
188      bloomKey.set(byteArrayForBloomKey(buf), 1.0);
189      bloomFilter.add(bloomKey);
190    }
191
192    @Override
193    public synchronized void close() throws IOException {
194      super.close();
195      DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
196      try {
197        bloomFilter.write(out);
198        out.flush();
199        out.close();
200        out = null;
201      } finally {
202        IOUtils.closeStream(out);
203      }
204    }
205
206  }
207  
208  public static class Reader extends MapFile.Reader {
209    private DynamicBloomFilter bloomFilter;
210    private DataOutputBuffer buf = new DataOutputBuffer();
211    private Key bloomKey = new Key();
212
213    public Reader(Path dir, Configuration conf,
214                  SequenceFile.Reader.Option... options) throws IOException {
215      super(dir, conf, options);
216      initBloomFilter(dir, conf);
217    }
218
219    @Deprecated
220    public Reader(FileSystem fs, String dirName, Configuration conf)
221        throws IOException {
222      this(new Path(dirName), conf);
223    }
224
225    @Deprecated
226    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
227        Configuration conf, boolean open) throws IOException {
228      this(new Path(dirName), conf, comparator(comparator));
229    }
230
231    @Deprecated
232    public Reader(FileSystem fs, String dirName, WritableComparator comparator,
233        Configuration conf) throws IOException {
234      this(new Path(dirName), conf, comparator(comparator));
235    }
236    
237    private void initBloomFilter(Path dirName, 
238                                 Configuration conf) {
239      
240      DataInputStream in = null;
241      try {
242        FileSystem fs = dirName.getFileSystem(conf);
243        in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
244        bloomFilter = new DynamicBloomFilter();
245        bloomFilter.readFields(in);
246        in.close();
247        in = null;
248      } catch (IOException ioe) {
249        LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
250        bloomFilter = null;
251      } finally {
252        IOUtils.closeStream(in);
253      }
254    }
255    
256    /**
257     * Checks if this MapFile has the indicated key. The membership test is
258     * performed using a Bloom filter, so the result has always non-zero
259     * probability of false positives.
260     * @param key key to check
261     * @return  false iff key doesn't exist, true if key probably exists.
262     * @throws IOException
263     */
264    public boolean probablyHasKey(WritableComparable key) throws IOException {
265      if (bloomFilter == null) {
266        return true;
267      }
268      buf.reset();
269      key.write(buf);
270      bloomKey.set(byteArrayForBloomKey(buf), 1.0);
271      return bloomFilter.membershipTest(bloomKey);
272    }
273    
274    /**
275     * Fast version of the
276     * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
277     * it checks the Bloom filter for the existence of the key, and only if
278     * present it performs the real get operation. This yields significant
279     * performance improvements for get operations on sparsely populated files.
280     */
281    @Override
282    public synchronized Writable get(WritableComparable key, Writable val)
283        throws IOException {
284      if (!probablyHasKey(key)) {
285        return null;
286      }
287      return super.get(key, val);
288    }
289    
290    /**
291     * Retrieve the Bloom filter used by this instance of the Reader.
292     * @return a Bloom filter (see {@link Filter})
293     */
294    public Filter getBloomFilter() {
295      return bloomFilter;
296    }
297  }
298}