001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 
019package org.apache.hadoop.mapreduce.lib.input;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.security.DigestException;
024import java.security.MessageDigest;
025import java.security.NoSuchAlgorithmException;
026import java.util.regex.Pattern;
027import java.util.regex.PatternSyntaxException;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configurable;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.io.BytesWritable;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapreduce.InputSplit;
038import org.apache.hadoop.mapreduce.Job;
039import org.apache.hadoop.mapreduce.RecordReader;
040import org.apache.hadoop.mapreduce.TaskAttemptContext;
041import org.apache.hadoop.util.ReflectionUtils;
042
043/**
044 * A class that allows a map/red job to work on a sample of sequence files.
045 * The sample is decided by the filter class set by the job.
046 */
047@InterfaceAudience.Public
048@InterfaceStability.Stable
049public class SequenceFileInputFilter<K, V>
050    extends SequenceFileInputFormat<K, V> {
051  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
052  
053  final public static String FILTER_CLASS = 
054    "mapreduce.input.sequencefileinputfilter.class";
055  final public static String FILTER_FREQUENCY = 
056    "mapreduce.input.sequencefileinputfilter.frequency";
057  final public static String FILTER_REGEX = 
058    "mapreduce.input.sequencefileinputfilter.regex";
059    
060  public SequenceFileInputFilter() {
061  }
062    
063  /** Create a record reader for the given split
064   * @param split file split
065   * @param context the task-attempt context
066   * @return RecordReader
067   */
068  public RecordReader<K, V> createRecordReader(InputSplit split,
069      TaskAttemptContext context) throws IOException {
070    context.setStatus(split.toString());
071    return new FilterRecordReader<K, V>(context.getConfiguration());
072  }
073
074
075  /** set the filter class
076   * 
077   * @param job The job
078   * @param filterClass filter class
079   */
080  public static void setFilterClass(Job job, Class<?> filterClass) {
081    job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
082  }
083
084         
085  /**
086   * filter interface
087   */
088  public interface Filter extends Configurable {
089    /** filter function
090     * Decide if a record should be filtered or not
091     * @param key record key
092     * @return true if a record is accepted; return false otherwise
093     */
094    public abstract boolean accept(Object key);
095  }
096    
097  /**
098   * base class for Filters
099   */
100  public static abstract class FilterBase implements Filter {
101    Configuration conf;
102        
103    public Configuration getConf() {
104      return conf;
105    }
106  }
107    
108  /** Records filter by matching key to regex
109   */
110  public static class RegexFilter extends FilterBase {
111    private Pattern p;
112    /** Define the filtering regex and stores it in conf
113     * @param conf where the regex is set
114     * @param regex regex used as a filter
115     */
116    public static void setPattern(Configuration conf, String regex)
117        throws PatternSyntaxException {
118      try {
119        Pattern.compile(regex);
120      } catch (PatternSyntaxException e) {
121        throw new IllegalArgumentException("Invalid pattern: "+regex);
122      }
123      conf.set(FILTER_REGEX, regex);
124    }
125        
126    public RegexFilter() { }
127        
128    /** configure the Filter by checking the configuration
129     */
130    public void setConf(Configuration conf) {
131      String regex = conf.get(FILTER_REGEX);
132      if (regex == null)
133        throw new RuntimeException(FILTER_REGEX + "not set");
134      this.p = Pattern.compile(regex);
135      this.conf = conf;
136    }
137
138
139    /** Filtering method
140     * If key matches the regex, return true; otherwise return false
141     * @see Filter#accept(Object)
142     */
143    public boolean accept(Object key) {
144      return p.matcher(key.toString()).matches();
145    }
146  }
147
148  /** This class returns a percentage of records
149   * The percentage is determined by a filtering frequency <i>f</i> using
150   * the criteria record# % f == 0.
151   * For example, if the frequency is 10, one out of 10 records is returned.
152   */
153  public static class PercentFilter extends FilterBase {
154    private int frequency;
155    private int count;
156
157    /** set the frequency and stores it in conf
158     * @param conf configuration
159     * @param frequency filtering frequencey
160     */
161    public static void setFrequency(Configuration conf, int frequency) {
162      if (frequency <= 0)
163        throw new IllegalArgumentException(
164          "Negative " + FILTER_FREQUENCY + ": " + frequency);
165      conf.setInt(FILTER_FREQUENCY, frequency);
166    }
167        
168    public PercentFilter() { }
169        
170    /** configure the filter by checking the configuration
171     * 
172     * @param conf configuration
173     */
174    public void setConf(Configuration conf) {
175      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
176      if (this.frequency <= 0) {
177        throw new RuntimeException(
178          "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
179      }
180      this.conf = conf;
181    }
182
183    /** Filtering method
184     * If record# % frequency==0, return true; otherwise return false
185     * @see Filter#accept(Object)
186     */
187    public boolean accept(Object key) {
188      boolean accepted = false;
189      if (count == 0)
190        accepted = true;
191      if (++count == frequency) {
192        count = 0;
193      }
194      return accepted;
195    }
196  }
197
198  /** This class returns a set of records by examing the MD5 digest of its
199   * key against a filtering frequency <i>f</i>. The filtering criteria is
200   * MD5(key) % f == 0.
201   */
202  public static class MD5Filter extends FilterBase {
203    private int frequency;
204    private static final MessageDigest DIGESTER;
205    public static final int MD5_LEN = 16;
206    private byte [] digest = new byte[MD5_LEN];
207        
208    static {
209      try {
210        DIGESTER = MessageDigest.getInstance("MD5");
211      } catch (NoSuchAlgorithmException e) {
212        throw new RuntimeException(e);
213      }
214    }
215
216
217    /** set the filtering frequency in configuration
218     * 
219     * @param conf configuration
220     * @param frequency filtering frequency
221     */
222    public static void setFrequency(Configuration conf, int frequency) {
223      if (frequency <= 0)
224        throw new IllegalArgumentException(
225          "Negative " + FILTER_FREQUENCY + ": " + frequency);
226      conf.setInt(FILTER_FREQUENCY, frequency);
227    }
228        
229    public MD5Filter() { }
230        
231    /** configure the filter according to configuration
232     * 
233     * @param conf configuration
234     */
235    public void setConf(Configuration conf) {
236      this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
237      if (this.frequency <= 0) {
238        throw new RuntimeException(
239          "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
240      }
241      this.conf = conf;
242    }
243
244    /** Filtering method
245     * If MD5(key) % frequency==0, return true; otherwise return false
246     * @see Filter#accept(Object)
247     */
248    public boolean accept(Object key) {
249      try {
250        long hashcode;
251        if (key instanceof Text) {
252          hashcode = MD5Hashcode((Text)key);
253        } else if (key instanceof BytesWritable) {
254          hashcode = MD5Hashcode((BytesWritable)key);
255        } else {
256          ByteBuffer bb;
257          bb = Text.encode(key.toString());
258          hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
259        }
260        if (hashcode / frequency * frequency == hashcode)
261          return true;
262      } catch(Exception e) {
263        LOG.warn(e);
264        throw new RuntimeException(e);
265      }
266      return false;
267    }
268        
269    private long MD5Hashcode(Text key) throws DigestException {
270      return MD5Hashcode(key.getBytes(), 0, key.getLength());
271    }
272        
273    private long MD5Hashcode(BytesWritable key) throws DigestException {
274      return MD5Hashcode(key.getBytes(), 0, key.getLength());
275    }
276    
277    synchronized private long MD5Hashcode(byte[] bytes, 
278        int start, int length) throws DigestException {
279      DIGESTER.update(bytes, 0, length);
280      DIGESTER.digest(digest, 0, MD5_LEN);
281      long hashcode=0;
282      for (int i = 0; i < 8; i++)
283        hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
284      return hashcode;
285    }
286  }
287    
288  private static class FilterRecordReader<K, V>
289      extends SequenceFileRecordReader<K, V> {
290    
291    private Filter filter;
292    private K key;
293    private V value;
294        
295    public FilterRecordReader(Configuration conf)
296        throws IOException {
297      super();
298      // instantiate filter
299      filter = (Filter)ReflectionUtils.newInstance(
300        conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
301    }
302    
303    public synchronized boolean nextKeyValue() 
304        throws IOException, InterruptedException {
305      while (super.nextKeyValue()) {
306        key = super.getCurrentKey();
307        if (filter.accept(key)) {
308          value = super.getCurrentValue();
309          return true;
310        }
311      }
312      return false;
313    }
314    
315    @Override
316    public K getCurrentKey() {
317      return key;
318    }
319    
320    @Override
321    public V getCurrentValue() {
322      return value;
323    }
324  }
325}