001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 
019package org.apache.hadoop.mapred;
020
021import java.io.IOException;
022import java.util.regex.PatternSyntaxException;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.util.ReflectionUtils;
028
029/**
030 * A class that allows a map/red job to work on a sample of sequence files.
031 * The sample is decided by the filter class set by the job.
032 */
033@InterfaceAudience.Public
034@InterfaceStability.Stable
035public class SequenceFileInputFilter<K, V>
036  extends SequenceFileInputFormat<K, V> {
037  
038  final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib.
039      input.SequenceFileInputFilter.FILTER_CLASS;
040
041  public SequenceFileInputFilter() {
042  }
043    
044  /** Create a record reader for the given split
045   * @param split file split
046   * @param job job configuration
047   * @param reporter reporter who sends report to task tracker
048   * @return RecordReader
049   */
050  public RecordReader<K, V> getRecordReader(InputSplit split,
051                                      JobConf job, Reporter reporter)
052    throws IOException {
053        
054    reporter.setStatus(split.toString());
055        
056    return new FilterRecordReader<K, V>(job, (FileSplit) split);
057  }
058
059
060  /** set the filter class
061   * 
062   * @param conf application configuration
063   * @param filterClass filter class
064   */
065  public static void setFilterClass(Configuration conf, Class filterClass) {
066    conf.set(FILTER_CLASS, filterClass.getName());
067  }
068
069         
070  /**
071   * filter interface
072   */
073  public interface Filter extends 
074      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter {
075  }
076    
077  /**
078   * base class for Filters
079   */
080  public static abstract class FilterBase extends org.apache.hadoop.mapreduce.
081      lib.input.SequenceFileInputFilter.FilterBase
082      implements Filter {
083  }
084    
085  /** Records filter by matching key to regex
086   */
087  public static class RegexFilter extends FilterBase {
088    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
089      RegexFilter rf;
090    public static void setPattern(Configuration conf, String regex)
091        throws PatternSyntaxException {
092      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
093        RegexFilter.setPattern(conf, regex);
094    }
095        
096    public RegexFilter() { 
097      rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
098             RegexFilter();
099    }
100        
101    /** configure the Filter by checking the configuration
102     */
103    public void setConf(Configuration conf) {
104      rf.setConf(conf);
105    }
106
107
108    /** Filtering method
109     * If key matches the regex, return true; otherwise return false
110     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
111     */
112    public boolean accept(Object key) {
113      return rf.accept(key);
114    }
115  }
116
117  /** This class returns a percentage of records
118   * The percentage is determined by a filtering frequency <i>f</i> using
119   * the criteria record# % f == 0.
120   * For example, if the frequency is 10, one out of 10 records is returned.
121   */
122  public static class PercentFilter extends FilterBase {
123    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
124              PercentFilter pf;
125    /** set the frequency and stores it in conf
126     * @param conf configuration
127     * @param frequency filtering frequencey
128     */
129    public static void setFrequency(Configuration conf, int frequency) {
130       org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
131              PercentFilter.setFrequency(conf, frequency);
132    }
133                
134    public PercentFilter() { 
135      pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
136        PercentFilter();
137    }
138                
139    /** configure the filter by checking the configuration
140     * 
141     * @param conf configuration
142     */
143    public void setConf(Configuration conf) {
144      pf.setConf(conf);
145    }
146
147    /** Filtering method
148     * If record# % frequency==0, return true; otherwise return false
149     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
150     */
151    public boolean accept(Object key) {
152      return pf.accept(key);
153    }
154  }
155
156  /** This class returns a set of records by examing the MD5 digest of its
157   * key against a filtering frequency <i>f</i>. The filtering criteria is
158   * MD5(key) % f == 0.
159   */
160  public static class MD5Filter extends FilterBase {
161    public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib.
162      input.SequenceFileInputFilter.MD5Filter.MD5_LEN;
163    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf;
164    /** set the filtering frequency in configuration
165     * 
166     * @param conf configuration
167     * @param frequency filtering frequency
168     */
169    public static void setFrequency(Configuration conf, int frequency) {
170      org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter.
171        setFrequency(conf, frequency);
172    }
173        
174    public MD5Filter() { 
175      mf = new org.apache.hadoop.mapreduce.lib.input.
176        SequenceFileInputFilter.MD5Filter();
177    }
178        
179    /** configure the filter according to configuration
180     * 
181     * @param conf configuration
182     */
183    public void setConf(Configuration conf) {
184      mf.setConf(conf);
185    }
186
187    /** Filtering method
188     * If MD5(key) % frequency==0, return true; otherwise return false
189     * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
190     */
191    public boolean accept(Object key) {
192      return mf.accept(key);
193    }
194  }
195    
196  private static class FilterRecordReader<K, V>
197    extends SequenceFileRecordReader<K, V> {
198    
199    private Filter filter;
200        
201    public FilterRecordReader(Configuration conf, FileSplit split)
202      throws IOException {
203      super(conf, split);
204      // instantiate filter
205      filter = (Filter)ReflectionUtils.newInstance(
206                                                   conf.getClass(FILTER_CLASS, PercentFilter.class), 
207                                                   conf);
208    }
209        
210    public synchronized boolean next(K key, V value) throws IOException {
211      while (next(key)) {
212        if (filter.accept(key)) {
213          getCurrentValue(value);
214          return true;
215        }
216      }
217            
218      return false;
219    }
220  }
221}