001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.io.SequenceFile.CompressionType; 033import org.apache.hadoop.io.compress.CompressionCodec; 034import org.apache.hadoop.util.Progressable; 035import org.apache.hadoop.util.bloom.DynamicBloomFilter; 036import org.apache.hadoop.util.bloom.Filter; 037import org.apache.hadoop.util.bloom.Key; 038import org.apache.hadoop.util.hash.Hash; 039 040import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT; 041import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_ERROR_RATE_KEY; 042import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_DEFAULT; 043import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_MAPFILE_BLOOM_SIZE_KEY; 044 045/** 046 * This class extends {@link MapFile} and provides very much the same 047 * functionality. However, it uses dynamic Bloom filters to provide 048 * quick membership test for keys, and it offers a fast version of 049 * {@link Reader#get(WritableComparable, Writable)} operation, especially in 050 * case of sparsely populated MapFile-s. 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Stable 054public class BloomMapFile { 055 private static final Log LOG = LogFactory.getLog(BloomMapFile.class); 056 public static final String BLOOM_FILE_NAME = "bloom"; 057 public static final int HASH_COUNT = 5; 058 059 public static void delete(FileSystem fs, String name) throws IOException { 060 Path dir = new Path(name); 061 Path data = new Path(dir, MapFile.DATA_FILE_NAME); 062 Path index = new Path(dir, MapFile.INDEX_FILE_NAME); 063 Path bloom = new Path(dir, BLOOM_FILE_NAME); 064 065 fs.delete(data, true); 066 fs.delete(index, true); 067 fs.delete(bloom, true); 068 fs.delete(dir, true); 069 } 070 071 private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) { 072 int cleanLength = buf.getLength(); 073 byte [] ba = buf.getData(); 074 if (cleanLength != ba.length) { 075 ba = new byte[cleanLength]; 076 System.arraycopy(buf.getData(), 0, ba, 0, cleanLength); 077 } 078 return ba; 079 } 080 081 public static class Writer extends MapFile.Writer { 082 private DynamicBloomFilter bloomFilter; 083 private int numKeys; 084 private int vectorSize; 085 private Key bloomKey = new Key(); 086 private DataOutputBuffer buf = new DataOutputBuffer(); 087 private FileSystem fs; 088 private Path dir; 089 090 @Deprecated 091 public Writer(Configuration conf, FileSystem fs, String dirName, 092 Class<? extends WritableComparable> keyClass, 093 Class<? extends Writable> valClass, CompressionType compress, 094 CompressionCodec codec, Progressable progress) throws IOException { 095 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 096 compression(compress, codec), progressable(progress)); 097 } 098 099 @Deprecated 100 public Writer(Configuration conf, FileSystem fs, String dirName, 101 Class<? extends WritableComparable> keyClass, 102 Class valClass, CompressionType compress, 103 Progressable progress) throws IOException { 104 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 105 compression(compress), progressable(progress)); 106 } 107 108 @Deprecated 109 public Writer(Configuration conf, FileSystem fs, String dirName, 110 Class<? extends WritableComparable> keyClass, 111 Class valClass, CompressionType compress) 112 throws IOException { 113 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 114 compression(compress)); 115 } 116 117 @Deprecated 118 public Writer(Configuration conf, FileSystem fs, String dirName, 119 WritableComparator comparator, Class valClass, 120 CompressionType compress, CompressionCodec codec, Progressable progress) 121 throws IOException { 122 this(conf, new Path(dirName), comparator(comparator), 123 valueClass(valClass), compression(compress, codec), 124 progressable(progress)); 125 } 126 127 @Deprecated 128 public Writer(Configuration conf, FileSystem fs, String dirName, 129 WritableComparator comparator, Class valClass, 130 CompressionType compress, Progressable progress) throws IOException { 131 this(conf, new Path(dirName), comparator(comparator), 132 valueClass(valClass), compression(compress), 133 progressable(progress)); 134 } 135 136 @Deprecated 137 public Writer(Configuration conf, FileSystem fs, String dirName, 138 WritableComparator comparator, Class valClass, CompressionType compress) 139 throws IOException { 140 this(conf, new Path(dirName), comparator(comparator), 141 valueClass(valClass), compression(compress)); 142 } 143 144 @Deprecated 145 public Writer(Configuration conf, FileSystem fs, String dirName, 146 WritableComparator comparator, Class valClass) throws IOException { 147 this(conf, new Path(dirName), comparator(comparator), 148 valueClass(valClass)); 149 } 150 151 @Deprecated 152 public Writer(Configuration conf, FileSystem fs, String dirName, 153 Class<? extends WritableComparable> keyClass, 154 Class valClass) throws IOException { 155 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 156 } 157 158 public Writer(Configuration conf, Path dir, 159 SequenceFile.Writer.Option... options) throws IOException { 160 super(conf, dir, options); 161 this.fs = dir.getFileSystem(conf); 162 this.dir = dir; 163 initBloomFilter(conf); 164 } 165 166 private synchronized void initBloomFilter(Configuration conf) { 167 numKeys = conf.getInt( 168 IO_MAPFILE_BLOOM_SIZE_KEY, IO_MAPFILE_BLOOM_SIZE_DEFAULT); 169 // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for 170 // single key, where <code> is the number of hash functions, 171 // <code>n</code> is the number of keys and <code>c</code> is the desired 172 // max. error rate. 173 // Our desired error rate is by default 0.005, i.e. 0.5% 174 float errorRate = conf.getFloat( 175 IO_MAPFILE_BLOOM_ERROR_RATE_KEY, IO_MAPFILE_BLOOM_ERROR_RATE_DEFAULT); 176 vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) / 177 Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT))); 178 bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, 179 Hash.getHashType(conf), numKeys); 180 } 181 182 @Override 183 public synchronized void append(WritableComparable key, Writable val) 184 throws IOException { 185 super.append(key, val); 186 buf.reset(); 187 key.write(buf); 188 bloomKey.set(byteArrayForBloomKey(buf), 1.0); 189 bloomFilter.add(bloomKey); 190 } 191 192 @Override 193 public synchronized void close() throws IOException { 194 super.close(); 195 DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true); 196 try { 197 bloomFilter.write(out); 198 out.flush(); 199 out.close(); 200 out = null; 201 } finally { 202 IOUtils.closeStream(out); 203 } 204 } 205 206 } 207 208 public static class Reader extends MapFile.Reader { 209 private DynamicBloomFilter bloomFilter; 210 private DataOutputBuffer buf = new DataOutputBuffer(); 211 private Key bloomKey = new Key(); 212 213 public Reader(Path dir, Configuration conf, 214 SequenceFile.Reader.Option... options) throws IOException { 215 super(dir, conf, options); 216 initBloomFilter(dir, conf); 217 } 218 219 @Deprecated 220 public Reader(FileSystem fs, String dirName, Configuration conf) 221 throws IOException { 222 this(new Path(dirName), conf); 223 } 224 225 @Deprecated 226 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 227 Configuration conf, boolean open) throws IOException { 228 this(new Path(dirName), conf, comparator(comparator)); 229 } 230 231 @Deprecated 232 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 233 Configuration conf) throws IOException { 234 this(new Path(dirName), conf, comparator(comparator)); 235 } 236 237 private void initBloomFilter(Path dirName, 238 Configuration conf) { 239 240 DataInputStream in = null; 241 try { 242 FileSystem fs = dirName.getFileSystem(conf); 243 in = fs.open(new Path(dirName, BLOOM_FILE_NAME)); 244 bloomFilter = new DynamicBloomFilter(); 245 bloomFilter.readFields(in); 246 in.close(); 247 in = null; 248 } catch (IOException ioe) { 249 LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile."); 250 bloomFilter = null; 251 } finally { 252 IOUtils.closeStream(in); 253 } 254 } 255 256 /** 257 * Checks if this MapFile has the indicated key. The membership test is 258 * performed using a Bloom filter, so the result has always non-zero 259 * probability of false positives. 260 * @param key key to check 261 * @return false iff key doesn't exist, true if key probably exists. 262 * @throws IOException 263 */ 264 public boolean probablyHasKey(WritableComparable key) throws IOException { 265 if (bloomFilter == null) { 266 return true; 267 } 268 buf.reset(); 269 key.write(buf); 270 bloomKey.set(byteArrayForBloomKey(buf), 1.0); 271 return bloomFilter.membershipTest(bloomKey); 272 } 273 274 /** 275 * Fast version of the 276 * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First 277 * it checks the Bloom filter for the existence of the key, and only if 278 * present it performs the real get operation. This yields significant 279 * performance improvements for get operations on sparsely populated files. 280 */ 281 @Override 282 public synchronized Writable get(WritableComparable key, Writable val) 283 throws IOException { 284 if (!probablyHasKey(key)) { 285 return null; 286 } 287 return super.get(key, val); 288 } 289 290 /** 291 * Retrieve the Bloom filter used by this instance of the Reader. 292 * @return a Bloom filter (see {@link Filter}) 293 */ 294 public Filter getBloomFilter() { 295 return bloomFilter; 296 } 297 } 298}