001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.IOException;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.fs.FileSystem;
031 import org.apache.hadoop.fs.Path;
032 import org.apache.hadoop.io.SequenceFile.CompressionType;
033 import org.apache.hadoop.io.compress.CompressionCodec;
034 import org.apache.hadoop.util.Progressable;
035 import org.apache.hadoop.util.bloom.DynamicBloomFilter;
036 import org.apache.hadoop.util.bloom.Filter;
037 import org.apache.hadoop.util.bloom.Key;
038 import org.apache.hadoop.util.hash.Hash;
039
040 /**
041 * This class extends {@link MapFile} and provides very much the same
042 * functionality. However, it uses dynamic Bloom filters to provide
043 * quick membership test for keys, and it offers a fast version of
044 * {@link Reader#get(WritableComparable, Writable)} operation, especially in
045 * case of sparsely populated MapFile-s.
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Stable
049 public class BloomMapFile {
050 private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
051 public static final String BLOOM_FILE_NAME = "bloom";
052 public static final int HASH_COUNT = 5;
053
054 public static void delete(FileSystem fs, String name) throws IOException {
055 Path dir = new Path(name);
056 Path data = new Path(dir, MapFile.DATA_FILE_NAME);
057 Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
058 Path bloom = new Path(dir, BLOOM_FILE_NAME);
059
060 fs.delete(data, true);
061 fs.delete(index, true);
062 fs.delete(bloom, true);
063 fs.delete(dir, true);
064 }
065
066 private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) {
067 int cleanLength = buf.getLength();
068 byte [] ba = buf.getData();
069 if (cleanLength != ba.length) {
070 ba = new byte[cleanLength];
071 System.arraycopy(buf.getData(), 0, ba, 0, cleanLength);
072 }
073 return ba;
074 }
075
076 public static class Writer extends MapFile.Writer {
077 private DynamicBloomFilter bloomFilter;
078 private int numKeys;
079 private int vectorSize;
080 private Key bloomKey = new Key();
081 private DataOutputBuffer buf = new DataOutputBuffer();
082 private FileSystem fs;
083 private Path dir;
084
085 @Deprecated
086 public Writer(Configuration conf, FileSystem fs, String dirName,
087 Class<? extends WritableComparable> keyClass,
088 Class<? extends Writable> valClass, CompressionType compress,
089 CompressionCodec codec, Progressable progress) throws IOException {
090 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
091 compression(compress, codec), progressable(progress));
092 }
093
094 @Deprecated
095 public Writer(Configuration conf, FileSystem fs, String dirName,
096 Class<? extends WritableComparable> keyClass,
097 Class valClass, CompressionType compress,
098 Progressable progress) throws IOException {
099 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
100 compression(compress), progressable(progress));
101 }
102
103 @Deprecated
104 public Writer(Configuration conf, FileSystem fs, String dirName,
105 Class<? extends WritableComparable> keyClass,
106 Class valClass, CompressionType compress)
107 throws IOException {
108 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
109 compression(compress));
110 }
111
112 @Deprecated
113 public Writer(Configuration conf, FileSystem fs, String dirName,
114 WritableComparator comparator, Class valClass,
115 CompressionType compress, CompressionCodec codec, Progressable progress)
116 throws IOException {
117 this(conf, new Path(dirName), comparator(comparator),
118 valueClass(valClass), compression(compress, codec),
119 progressable(progress));
120 }
121
122 @Deprecated
123 public Writer(Configuration conf, FileSystem fs, String dirName,
124 WritableComparator comparator, Class valClass,
125 CompressionType compress, Progressable progress) throws IOException {
126 this(conf, new Path(dirName), comparator(comparator),
127 valueClass(valClass), compression(compress),
128 progressable(progress));
129 }
130
131 @Deprecated
132 public Writer(Configuration conf, FileSystem fs, String dirName,
133 WritableComparator comparator, Class valClass, CompressionType compress)
134 throws IOException {
135 this(conf, new Path(dirName), comparator(comparator),
136 valueClass(valClass), compression(compress));
137 }
138
139 @Deprecated
140 public Writer(Configuration conf, FileSystem fs, String dirName,
141 WritableComparator comparator, Class valClass) throws IOException {
142 this(conf, new Path(dirName), comparator(comparator),
143 valueClass(valClass));
144 }
145
146 @Deprecated
147 public Writer(Configuration conf, FileSystem fs, String dirName,
148 Class<? extends WritableComparable> keyClass,
149 Class valClass) throws IOException {
150 this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
151 }
152
153 public Writer(Configuration conf, Path dir,
154 SequenceFile.Writer.Option... options) throws IOException {
155 super(conf, dir, options);
156 this.fs = dir.getFileSystem(conf);
157 this.dir = dir;
158 initBloomFilter(conf);
159 }
160
161 private synchronized void initBloomFilter(Configuration conf) {
162 numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
163 // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
164 // single key, where <code> is the number of hash functions,
165 // <code>n</code> is the number of keys and <code>c</code> is the desired
166 // max. error rate.
167 // Our desired error rate is by default 0.005, i.e. 0.5%
168 float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
169 vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
170 Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
171 bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
172 Hash.getHashType(conf), numKeys);
173 }
174
175 @Override
176 public synchronized void append(WritableComparable key, Writable val)
177 throws IOException {
178 super.append(key, val);
179 buf.reset();
180 key.write(buf);
181 bloomKey.set(byteArrayForBloomKey(buf), 1.0);
182 bloomFilter.add(bloomKey);
183 }
184
185 @Override
186 public synchronized void close() throws IOException {
187 super.close();
188 DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
189 try {
190 bloomFilter.write(out);
191 out.flush();
192 out.close();
193 out = null;
194 } finally {
195 IOUtils.closeStream(out);
196 }
197 }
198
199 }
200
201 public static class Reader extends MapFile.Reader {
202 private DynamicBloomFilter bloomFilter;
203 private DataOutputBuffer buf = new DataOutputBuffer();
204 private Key bloomKey = new Key();
205
206 public Reader(Path dir, Configuration conf,
207 SequenceFile.Reader.Option... options) throws IOException {
208 super(dir, conf, options);
209 initBloomFilter(dir, conf);
210 }
211
212 @Deprecated
213 public Reader(FileSystem fs, String dirName, Configuration conf)
214 throws IOException {
215 this(new Path(dirName), conf);
216 }
217
218 @Deprecated
219 public Reader(FileSystem fs, String dirName, WritableComparator comparator,
220 Configuration conf, boolean open) throws IOException {
221 this(new Path(dirName), conf, comparator(comparator));
222 }
223
224 @Deprecated
225 public Reader(FileSystem fs, String dirName, WritableComparator comparator,
226 Configuration conf) throws IOException {
227 this(new Path(dirName), conf, comparator(comparator));
228 }
229
230 private void initBloomFilter(Path dirName,
231 Configuration conf) {
232
233 DataInputStream in = null;
234 try {
235 FileSystem fs = dirName.getFileSystem(conf);
236 in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
237 bloomFilter = new DynamicBloomFilter();
238 bloomFilter.readFields(in);
239 in.close();
240 in = null;
241 } catch (IOException ioe) {
242 LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
243 bloomFilter = null;
244 } finally {
245 IOUtils.closeStream(in);
246 }
247 }
248
249 /**
250 * Checks if this MapFile has the indicated key. The membership test is
251 * performed using a Bloom filter, so the result has always non-zero
252 * probability of false positives.
253 * @param key key to check
254 * @return false iff key doesn't exist, true if key probably exists.
255 * @throws IOException
256 */
257 public boolean probablyHasKey(WritableComparable key) throws IOException {
258 if (bloomFilter == null) {
259 return true;
260 }
261 buf.reset();
262 key.write(buf);
263 bloomKey.set(byteArrayForBloomKey(buf), 1.0);
264 return bloomFilter.membershipTest(bloomKey);
265 }
266
267 /**
268 * Fast version of the
269 * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
270 * it checks the Bloom filter for the existence of the key, and only if
271 * present it performs the real get operation. This yields significant
272 * performance improvements for get operations on sparsely populated files.
273 */
274 @Override
275 public synchronized Writable get(WritableComparable key, Writable val)
276 throws IOException {
277 if (!probablyHasKey(key)) {
278 return null;
279 }
280 return super.get(key, val);
281 }
282
283 /**
284 * Retrieve the Bloom filter used by this instance of the Reader.
285 * @return a Bloom filter (see {@link Filter})
286 */
287 public Filter getBloomFilter() {
288 return bloomFilter;
289 }
290 }
291 }