001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.IOException;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.fs.FileSystem;
031    import org.apache.hadoop.fs.Path;
032    import org.apache.hadoop.io.SequenceFile.CompressionType;
033    import org.apache.hadoop.io.compress.CompressionCodec;
034    import org.apache.hadoop.util.Progressable;
035    import org.apache.hadoop.util.bloom.DynamicBloomFilter;
036    import org.apache.hadoop.util.bloom.Filter;
037    import org.apache.hadoop.util.bloom.Key;
038    import org.apache.hadoop.util.hash.Hash;
039    
040    /**
041     * This class extends {@link MapFile} and provides very much the same
042     * functionality. However, it uses dynamic Bloom filters to provide
043     * quick membership test for keys, and it offers a fast version of 
044     * {@link Reader#get(WritableComparable, Writable)} operation, especially in
045     * case of sparsely populated MapFile-s.
046     */
047    @InterfaceAudience.Public
048    @InterfaceStability.Stable
049    public class BloomMapFile {
050      private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
051      public static final String BLOOM_FILE_NAME = "bloom";
052      public static final int HASH_COUNT = 5;
053      
054      public static void delete(FileSystem fs, String name) throws IOException {
055        Path dir = new Path(name);
056        Path data = new Path(dir, MapFile.DATA_FILE_NAME);
057        Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
058        Path bloom = new Path(dir, BLOOM_FILE_NAME);
059    
060        fs.delete(data, true);
061        fs.delete(index, true);
062        fs.delete(bloom, true);
063        fs.delete(dir, true);
064      }
065    
066      private static byte[] byteArrayForBloomKey(DataOutputBuffer buf) {
067        int cleanLength = buf.getLength();
068        byte [] ba = buf.getData();
069        if (cleanLength != ba.length) {
070          ba = new byte[cleanLength];
071          System.arraycopy(buf.getData(), 0, ba, 0, cleanLength);
072        }
073        return ba;
074      }
075      
076      public static class Writer extends MapFile.Writer {
077        private DynamicBloomFilter bloomFilter;
078        private int numKeys;
079        private int vectorSize;
080        private Key bloomKey = new Key();
081        private DataOutputBuffer buf = new DataOutputBuffer();
082        private FileSystem fs;
083        private Path dir;
084        
085        @Deprecated
086        public Writer(Configuration conf, FileSystem fs, String dirName,
087            Class<? extends WritableComparable> keyClass,
088            Class<? extends Writable> valClass, CompressionType compress,
089            CompressionCodec codec, Progressable progress) throws IOException {
090          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
091               compression(compress, codec), progressable(progress));
092        }
093    
094        @Deprecated
095        public Writer(Configuration conf, FileSystem fs, String dirName,
096            Class<? extends WritableComparable> keyClass,
097            Class valClass, CompressionType compress,
098            Progressable progress) throws IOException {
099          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
100               compression(compress), progressable(progress));
101        }
102    
103        @Deprecated
104        public Writer(Configuration conf, FileSystem fs, String dirName,
105            Class<? extends WritableComparable> keyClass,
106            Class valClass, CompressionType compress)
107            throws IOException {
108          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), 
109               compression(compress));
110        }
111    
112        @Deprecated
113        public Writer(Configuration conf, FileSystem fs, String dirName,
114            WritableComparator comparator, Class valClass,
115            CompressionType compress, CompressionCodec codec, Progressable progress)
116            throws IOException {
117          this(conf, new Path(dirName), comparator(comparator), 
118               valueClass(valClass), compression(compress, codec), 
119               progressable(progress));
120        }
121    
122        @Deprecated
123        public Writer(Configuration conf, FileSystem fs, String dirName,
124            WritableComparator comparator, Class valClass,
125            CompressionType compress, Progressable progress) throws IOException {
126          this(conf, new Path(dirName), comparator(comparator), 
127               valueClass(valClass), compression(compress),
128               progressable(progress));
129        }
130    
131        @Deprecated
132        public Writer(Configuration conf, FileSystem fs, String dirName,
133            WritableComparator comparator, Class valClass, CompressionType compress)
134            throws IOException {
135          this(conf, new Path(dirName), comparator(comparator), 
136               valueClass(valClass), compression(compress));
137        }
138    
139        @Deprecated
140        public Writer(Configuration conf, FileSystem fs, String dirName,
141            WritableComparator comparator, Class valClass) throws IOException {
142          this(conf, new Path(dirName), comparator(comparator), 
143               valueClass(valClass));
144        }
145    
146        @Deprecated
147        public Writer(Configuration conf, FileSystem fs, String dirName,
148                      Class<? extends WritableComparable> keyClass,
149                      Class valClass) throws IOException {
150          this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
151        }
152    
153        public Writer(Configuration conf, Path dir, 
154                      SequenceFile.Writer.Option... options) throws IOException {
155          super(conf, dir, options);
156          this.fs = dir.getFileSystem(conf);
157          this.dir = dir;
158          initBloomFilter(conf);
159        }
160    
161        private synchronized void initBloomFilter(Configuration conf) {
162          numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
163          // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
164          // single key, where <code> is the number of hash functions,
165          // <code>n</code> is the number of keys and <code>c</code> is the desired
166          // max. error rate.
167          // Our desired error rate is by default 0.005, i.e. 0.5%
168          float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
169          vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
170              Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
171          bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
172              Hash.getHashType(conf), numKeys);
173        }
174    
175        @Override
176        public synchronized void append(WritableComparable key, Writable val)
177            throws IOException {
178          super.append(key, val);
179          buf.reset();
180          key.write(buf);
181          bloomKey.set(byteArrayForBloomKey(buf), 1.0);
182          bloomFilter.add(bloomKey);
183        }
184    
185        @Override
186        public synchronized void close() throws IOException {
187          super.close();
188          DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
189          try {
190            bloomFilter.write(out);
191            out.flush();
192            out.close();
193            out = null;
194          } finally {
195            IOUtils.closeStream(out);
196          }
197        }
198    
199      }
200      
201      public static class Reader extends MapFile.Reader {
202        private DynamicBloomFilter bloomFilter;
203        private DataOutputBuffer buf = new DataOutputBuffer();
204        private Key bloomKey = new Key();
205    
206        public Reader(Path dir, Configuration conf,
207                      SequenceFile.Reader.Option... options) throws IOException {
208          super(dir, conf, options);
209          initBloomFilter(dir, conf);
210        }
211    
212        @Deprecated
213        public Reader(FileSystem fs, String dirName, Configuration conf)
214            throws IOException {
215          this(new Path(dirName), conf);
216        }
217    
218        @Deprecated
219        public Reader(FileSystem fs, String dirName, WritableComparator comparator,
220            Configuration conf, boolean open) throws IOException {
221          this(new Path(dirName), conf, comparator(comparator));
222        }
223    
224        @Deprecated
225        public Reader(FileSystem fs, String dirName, WritableComparator comparator,
226            Configuration conf) throws IOException {
227          this(new Path(dirName), conf, comparator(comparator));
228        }
229        
230        private void initBloomFilter(Path dirName, 
231                                     Configuration conf) {
232          
233          DataInputStream in = null;
234          try {
235            FileSystem fs = dirName.getFileSystem(conf);
236            in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
237            bloomFilter = new DynamicBloomFilter();
238            bloomFilter.readFields(in);
239            in.close();
240            in = null;
241          } catch (IOException ioe) {
242            LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
243            bloomFilter = null;
244          } finally {
245            IOUtils.closeStream(in);
246          }
247        }
248        
249        /**
250         * Checks if this MapFile has the indicated key. The membership test is
251         * performed using a Bloom filter, so the result has always non-zero
252         * probability of false positives.
253         * @param key key to check
254         * @return  false iff key doesn't exist, true if key probably exists.
255         * @throws IOException
256         */
257        public boolean probablyHasKey(WritableComparable key) throws IOException {
258          if (bloomFilter == null) {
259            return true;
260          }
261          buf.reset();
262          key.write(buf);
263          bloomKey.set(byteArrayForBloomKey(buf), 1.0);
264          return bloomFilter.membershipTest(bloomKey);
265        }
266        
267        /**
268         * Fast version of the
269         * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
270         * it checks the Bloom filter for the existence of the key, and only if
271         * present it performs the real get operation. This yields significant
272         * performance improvements for get operations on sparsely populated files.
273         */
274        @Override
275        public synchronized Writable get(WritableComparable key, Writable val)
276            throws IOException {
277          if (!probablyHasKey(key)) {
278            return null;
279          }
280          return super.get(key, val);
281        }
282        
283        /**
284         * Retrieve the Bloom filter used by this instance of the Reader.
285         * @return a Bloom filter (see {@link Filter})
286         */
287        public Filter getBloomFilter() {
288          return bloomFilter;
289        }
290      }
291    }