001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.IOException; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.io.SequenceFile.CompressionType; 033import org.apache.hadoop.io.compress.CompressionCodec; 034import org.apache.hadoop.util.Progressable; 035import org.apache.hadoop.util.bloom.DynamicBloomFilter; 036import org.apache.hadoop.util.bloom.Filter; 037import org.apache.hadoop.util.bloom.Key; 038import org.apache.hadoop.util.hash.Hash; 039 040/** 041 * This class extends {@link MapFile} and provides very much the same 042 * functionality. However, it uses dynamic Bloom filters to provide 043 * quick membership test for keys, and it offers a fast version of 044 * {@link Reader#get(WritableComparable, Writable)} operation, especially in 045 * case of sparsely populated MapFile-s. 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public class BloomMapFile { 050 private static final Log LOG = LogFactory.getLog(BloomMapFile.class); 051 public static final String BLOOM_FILE_NAME = "bloom"; 052 public static final int HASH_COUNT = 5; 053 054 public static void delete(FileSystem fs, String name) throws IOException { 055 Path dir = new Path(name); 056 Path data = new Path(dir, MapFile.DATA_FILE_NAME); 057 Path index = new Path(dir, MapFile.INDEX_FILE_NAME); 058 Path bloom = new Path(dir, BLOOM_FILE_NAME); 059 060 fs.delete(data, true); 061 fs.delete(index, true); 062 fs.delete(bloom, true); 063 fs.delete(dir, true); 064 } 065 066 private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) { 067 int cleanLength = buf.getLength(); 068 byte [] ba = buf.getData(); 069 if (cleanLength != ba.length) { 070 ba = new byte[cleanLength]; 071 System.arraycopy(buf.getData(), 0, ba, 0, cleanLength); 072 } 073 return ba; 074 } 075 076 public static class Writer extends MapFile.Writer { 077 private DynamicBloomFilter bloomFilter; 078 private int numKeys; 079 private int vectorSize; 080 private Key bloomKey = new Key(); 081 private DataOutputBuffer buf = new DataOutputBuffer(); 082 private FileSystem fs; 083 private Path dir; 084 085 @Deprecated 086 public Writer(Configuration conf, FileSystem fs, String dirName, 087 Class<? extends WritableComparable> keyClass, 088 Class<? extends Writable> valClass, CompressionType compress, 089 CompressionCodec codec, Progressable progress) throws IOException { 090 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 091 compression(compress, codec), progressable(progress)); 092 } 093 094 @Deprecated 095 public Writer(Configuration conf, FileSystem fs, String dirName, 096 Class<? extends WritableComparable> keyClass, 097 Class valClass, CompressionType compress, 098 Progressable progress) throws IOException { 099 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 100 compression(compress), progressable(progress)); 101 } 102 103 @Deprecated 104 public Writer(Configuration conf, FileSystem fs, String dirName, 105 Class<? extends WritableComparable> keyClass, 106 Class valClass, CompressionType compress) 107 throws IOException { 108 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 109 compression(compress)); 110 } 111 112 @Deprecated 113 public Writer(Configuration conf, FileSystem fs, String dirName, 114 WritableComparator comparator, Class valClass, 115 CompressionType compress, CompressionCodec codec, Progressable progress) 116 throws IOException { 117 this(conf, new Path(dirName), comparator(comparator), 118 valueClass(valClass), compression(compress, codec), 119 progressable(progress)); 120 } 121 122 @Deprecated 123 public Writer(Configuration conf, FileSystem fs, String dirName, 124 WritableComparator comparator, Class valClass, 125 CompressionType compress, Progressable progress) throws IOException { 126 this(conf, new Path(dirName), comparator(comparator), 127 valueClass(valClass), compression(compress), 128 progressable(progress)); 129 } 130 131 @Deprecated 132 public Writer(Configuration conf, FileSystem fs, String dirName, 133 WritableComparator comparator, Class valClass, CompressionType compress) 134 throws IOException { 135 this(conf, new Path(dirName), comparator(comparator), 136 valueClass(valClass), compression(compress)); 137 } 138 139 @Deprecated 140 public Writer(Configuration conf, FileSystem fs, String dirName, 141 WritableComparator comparator, Class valClass) throws IOException { 142 this(conf, new Path(dirName), comparator(comparator), 143 valueClass(valClass)); 144 } 145 146 @Deprecated 147 public Writer(Configuration conf, FileSystem fs, String dirName, 148 Class<? extends WritableComparable> keyClass, 149 Class valClass) throws IOException { 150 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); 151 } 152 153 public Writer(Configuration conf, Path dir, 154 SequenceFile.Writer.Option... options) throws IOException { 155 super(conf, dir, options); 156 this.fs = dir.getFileSystem(conf); 157 this.dir = dir; 158 initBloomFilter(conf); 159 } 160 161 private synchronized void initBloomFilter(Configuration conf) { 162 numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024); 163 // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for 164 // single key, where <code> is the number of hash functions, 165 // <code>n</code> is the number of keys and <code>c</code> is the desired 166 // max. error rate. 167 // Our desired error rate is by default 0.005, i.e. 0.5% 168 float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f); 169 vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) / 170 Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT))); 171 bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT, 172 Hash.getHashType(conf), numKeys); 173 } 174 175 @Override 176 public synchronized void append(WritableComparable key, Writable val) 177 throws IOException { 178 super.append(key, val); 179 buf.reset(); 180 key.write(buf); 181 bloomKey.set(byteArrayForBloomKey(buf), 1.0); 182 bloomFilter.add(bloomKey); 183 } 184 185 @Override 186 public synchronized void close() throws IOException { 187 super.close(); 188 DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true); 189 try { 190 bloomFilter.write(out); 191 out.flush(); 192 out.close(); 193 out = null; 194 } finally { 195 IOUtils.closeStream(out); 196 } 197 } 198 199 } 200 201 public static class Reader extends MapFile.Reader { 202 private DynamicBloomFilter bloomFilter; 203 private DataOutputBuffer buf = new DataOutputBuffer(); 204 private Key bloomKey = new Key(); 205 206 public Reader(Path dir, Configuration conf, 207 SequenceFile.Reader.Option... options) throws IOException { 208 super(dir, conf, options); 209 initBloomFilter(dir, conf); 210 } 211 212 @Deprecated 213 public Reader(FileSystem fs, String dirName, Configuration conf) 214 throws IOException { 215 this(new Path(dirName), conf); 216 } 217 218 @Deprecated 219 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 220 Configuration conf, boolean open) throws IOException { 221 this(new Path(dirName), conf, comparator(comparator)); 222 } 223 224 @Deprecated 225 public Reader(FileSystem fs, String dirName, WritableComparator comparator, 226 Configuration conf) throws IOException { 227 this(new Path(dirName), conf, comparator(comparator)); 228 } 229 230 private void initBloomFilter(Path dirName, 231 Configuration conf) { 232 233 DataInputStream in = null; 234 try { 235 FileSystem fs = dirName.getFileSystem(conf); 236 in = fs.open(new Path(dirName, BLOOM_FILE_NAME)); 237 bloomFilter = new DynamicBloomFilter(); 238 bloomFilter.readFields(in); 239 in.close(); 240 in = null; 241 } catch (IOException ioe) { 242 LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile."); 243 bloomFilter = null; 244 } finally { 245 IOUtils.closeStream(in); 246 } 247 } 248 249 /** 250 * Checks if this MapFile has the indicated key. The membership test is 251 * performed using a Bloom filter, so the result has always non-zero 252 * probability of false positives. 253 * @param key key to check 254 * @return false iff key doesn't exist, true if key probably exists. 255 * @throws IOException 256 */ 257 public boolean probablyHasKey(WritableComparable key) throws IOException { 258 if (bloomFilter == null) { 259 return true; 260 } 261 buf.reset(); 262 key.write(buf); 263 bloomKey.set(byteArrayForBloomKey(buf), 1.0); 264 return bloomFilter.membershipTest(bloomKey); 265 } 266 267 /** 268 * Fast version of the 269 * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First 270 * it checks the Bloom filter for the existence of the key, and only if 271 * present it performs the real get operation. This yields significant 272 * performance improvements for get operations on sparsely populated files. 273 */ 274 @Override 275 public synchronized Writable get(WritableComparable key, Writable val) 276 throws IOException { 277 if (!probablyHasKey(key)) { 278 return null; 279 } 280 return super.get(key, val); 281 } 282 283 /** 284 * Retrieve the Bloom filter used by this instance of the Reader. 285 * @return a Bloom filter (see {@link Filter}) 286 */ 287 public Filter getBloomFilter() { 288 return bloomFilter; 289 } 290 } 291}