001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.lib.input; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.security.DigestException; 024import java.security.MessageDigest; 025import java.security.NoSuchAlgorithmException; 026import java.util.regex.Pattern; 027import java.util.regex.PatternSyntaxException; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configurable; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.io.BytesWritable; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.Job; 039import org.apache.hadoop.mapreduce.RecordReader; 040import org.apache.hadoop.mapreduce.TaskAttemptContext; 041import org.apache.hadoop.util.ReflectionUtils; 042 043/** 044 * A class that allows a map/red job to work on a sample of sequence files. 045 * The sample is decided by the filter class set by the job. 046 */ 047@InterfaceAudience.Public 048@InterfaceStability.Stable 049public class SequenceFileInputFilter<K, V> 050 extends SequenceFileInputFormat<K, V> { 051 public static final Log LOG = LogFactory.getLog(FileInputFormat.class); 052 053 final public static String FILTER_CLASS = 054 "mapreduce.input.sequencefileinputfilter.class"; 055 final public static String FILTER_FREQUENCY = 056 "mapreduce.input.sequencefileinputfilter.frequency"; 057 final public static String FILTER_REGEX = 058 "mapreduce.input.sequencefileinputfilter.regex"; 059 060 public SequenceFileInputFilter() { 061 } 062 063 /** Create a record reader for the given split 064 * @param split file split 065 * @param context the task-attempt context 066 * @return RecordReader 067 */ 068 public RecordReader<K, V> createRecordReader(InputSplit split, 069 TaskAttemptContext context) throws IOException { 070 context.setStatus(split.toString()); 071 return new FilterRecordReader<K, V>(context.getConfiguration()); 072 } 073 074 075 /** set the filter class 076 * 077 * @param job The job 078 * @param filterClass filter class 079 */ 080 public static void setFilterClass(Job job, Class<?> filterClass) { 081 job.getConfiguration().set(FILTER_CLASS, filterClass.getName()); 082 } 083 084 085 /** 086 * filter interface 087 */ 088 public interface Filter extends Configurable { 089 /** filter function 090 * Decide if a record should be filtered or not 091 * @param key record key 092 * @return true if a record is accepted; return false otherwise 093 */ 094 public abstract boolean accept(Object key); 095 } 096 097 /** 098 * base class for Filters 099 */ 100 public static abstract class FilterBase implements Filter { 101 Configuration conf; 102 103 public Configuration getConf() { 104 return conf; 105 } 106 } 107 108 /** Records filter by matching key to regex 109 */ 110 public static class RegexFilter extends FilterBase { 111 private Pattern p; 112 /** Define the filtering regex and stores it in conf 113 * @param conf where the regex is set 114 * @param regex regex used as a filter 115 */ 116 public static void setPattern(Configuration conf, String regex) 117 throws PatternSyntaxException { 118 try { 119 Pattern.compile(regex); 120 } catch (PatternSyntaxException e) { 121 throw new IllegalArgumentException("Invalid pattern: "+regex); 122 } 123 conf.set(FILTER_REGEX, regex); 124 } 125 126 public RegexFilter() { } 127 128 /** configure the Filter by checking the configuration 129 */ 130 public void setConf(Configuration conf) { 131 String regex = conf.get(FILTER_REGEX); 132 if (regex == null) 133 throw new RuntimeException(FILTER_REGEX + "not set"); 134 this.p = Pattern.compile(regex); 135 this.conf = conf; 136 } 137 138 139 /** Filtering method 140 * If key matches the regex, return true; otherwise return false 141 * @see Filter#accept(Object) 142 */ 143 public boolean accept(Object key) { 144 return p.matcher(key.toString()).matches(); 145 } 146 } 147 148 /** This class returns a percentage of records 149 * The percentage is determined by a filtering frequency <i>f</i> using 150 * the criteria record# % f == 0. 151 * For example, if the frequency is 10, one out of 10 records is returned. 152 */ 153 public static class PercentFilter extends FilterBase { 154 private int frequency; 155 private int count; 156 157 /** set the frequency and stores it in conf 158 * @param conf configuration 159 * @param frequency filtering frequencey 160 */ 161 public static void setFrequency(Configuration conf, int frequency) { 162 if (frequency <= 0) 163 throw new IllegalArgumentException( 164 "Negative " + FILTER_FREQUENCY + ": " + frequency); 165 conf.setInt(FILTER_FREQUENCY, frequency); 166 } 167 168 public PercentFilter() { } 169 170 /** configure the filter by checking the configuration 171 * 172 * @param conf configuration 173 */ 174 public void setConf(Configuration conf) { 175 this.frequency = conf.getInt(FILTER_FREQUENCY, 10); 176 if (this.frequency <= 0) { 177 throw new RuntimeException( 178 "Negative "+FILTER_FREQUENCY + ": " + this.frequency); 179 } 180 this.conf = conf; 181 } 182 183 /** Filtering method 184 * If record# % frequency==0, return true; otherwise return false 185 * @see Filter#accept(Object) 186 */ 187 public boolean accept(Object key) { 188 boolean accepted = false; 189 if (count == 0) 190 accepted = true; 191 if (++count == frequency) { 192 count = 0; 193 } 194 return accepted; 195 } 196 } 197 198 /** This class returns a set of records by examing the MD5 digest of its 199 * key against a filtering frequency <i>f</i>. The filtering criteria is 200 * MD5(key) % f == 0. 201 */ 202 public static class MD5Filter extends FilterBase { 203 private int frequency; 204 private static final MessageDigest DIGESTER; 205 public static final int MD5_LEN = 16; 206 private byte [] digest = new byte[MD5_LEN]; 207 208 static { 209 try { 210 DIGESTER = MessageDigest.getInstance("MD5"); 211 } catch (NoSuchAlgorithmException e) { 212 throw new RuntimeException(e); 213 } 214 } 215 216 217 /** set the filtering frequency in configuration 218 * 219 * @param conf configuration 220 * @param frequency filtering frequency 221 */ 222 public static void setFrequency(Configuration conf, int frequency) { 223 if (frequency <= 0) 224 throw new IllegalArgumentException( 225 "Negative " + FILTER_FREQUENCY + ": " + frequency); 226 conf.setInt(FILTER_FREQUENCY, frequency); 227 } 228 229 public MD5Filter() { } 230 231 /** configure the filter according to configuration 232 * 233 * @param conf configuration 234 */ 235 public void setConf(Configuration conf) { 236 this.frequency = conf.getInt(FILTER_FREQUENCY, 10); 237 if (this.frequency <= 0) { 238 throw new RuntimeException( 239 "Negative " + FILTER_FREQUENCY + ": " + this.frequency); 240 } 241 this.conf = conf; 242 } 243 244 /** Filtering method 245 * If MD5(key) % frequency==0, return true; otherwise return false 246 * @see Filter#accept(Object) 247 */ 248 public boolean accept(Object key) { 249 try { 250 long hashcode; 251 if (key instanceof Text) { 252 hashcode = MD5Hashcode((Text)key); 253 } else if (key instanceof BytesWritable) { 254 hashcode = MD5Hashcode((BytesWritable)key); 255 } else { 256 ByteBuffer bb; 257 bb = Text.encode(key.toString()); 258 hashcode = MD5Hashcode(bb.array(), 0, bb.limit()); 259 } 260 if (hashcode / frequency * frequency == hashcode) 261 return true; 262 } catch(Exception e) { 263 LOG.warn(e); 264 throw new RuntimeException(e); 265 } 266 return false; 267 } 268 269 private long MD5Hashcode(Text key) throws DigestException { 270 return MD5Hashcode(key.getBytes(), 0, key.getLength()); 271 } 272 273 private long MD5Hashcode(BytesWritable key) throws DigestException { 274 return MD5Hashcode(key.getBytes(), 0, key.getLength()); 275 } 276 277 synchronized private long MD5Hashcode(byte[] bytes, 278 int start, int length) throws DigestException { 279 DIGESTER.update(bytes, 0, length); 280 DIGESTER.digest(digest, 0, MD5_LEN); 281 long hashcode=0; 282 for (int i = 0; i < 8; i++) 283 hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i))); 284 return hashcode; 285 } 286 } 287 288 private static class FilterRecordReader<K, V> 289 extends SequenceFileRecordReader<K, V> { 290 291 private Filter filter; 292 private K key; 293 private V value; 294 295 public FilterRecordReader(Configuration conf) 296 throws IOException { 297 super(); 298 // instantiate filter 299 filter = (Filter)ReflectionUtils.newInstance( 300 conf.getClass(FILTER_CLASS, PercentFilter.class), conf); 301 } 302 303 public synchronized boolean nextKeyValue() 304 throws IOException, InterruptedException { 305 while (super.nextKeyValue()) { 306 key = super.getCurrentKey(); 307 if (filter.accept(key)) { 308 value = super.getCurrentValue(); 309 return true; 310 } 311 } 312 return false; 313 } 314 315 @Override 316 public K getCurrentKey() { 317 return key; 318 } 319 320 @Override 321 public V getCurrentValue() { 322 return value; 323 } 324 } 325}