001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018     
019    package org.apache.hadoop.mapred;
020    
021    import java.io.IOException;
022    import java.util.regex.PatternSyntaxException;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.util.ReflectionUtils;
028    
029    /**
030     * A class that allows a map/red job to work on a sample of sequence files.
031     * The sample is decided by the filter class set by the job.
032     */
033    @InterfaceAudience.Public
034    @InterfaceStability.Stable
035    public class SequenceFileInputFilter<K, V>
036      extends SequenceFileInputFormat<K, V> {
037      
038      final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib.
039          input.SequenceFileInputFilter.FILTER_CLASS;
040    
041      public SequenceFileInputFilter() {
042      }
043        
044      /** Create a record reader for the given split
045       * @param split file split
046       * @param job job configuration
047       * @param reporter reporter who sends report to task tracker
048       * @return RecordReader
049       */
050      public RecordReader<K, V> getRecordReader(InputSplit split,
051                                          JobConf job, Reporter reporter)
052        throws IOException {
053            
054        reporter.setStatus(split.toString());
055            
056        return new FilterRecordReader<K, V>(job, (FileSplit) split);
057      }
058    
059    
060      /** set the filter class
061       * 
062       * @param conf application configuration
063       * @param filterClass filter class
064       */
065      public static void setFilterClass(Configuration conf, Class filterClass) {
066        conf.set(FILTER_CLASS, filterClass.getName());
067      }
068    
069             
070      /**
071       * filter interface
072       */
073      public interface Filter extends 
074          org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter {
075      }
076        
077      /**
078       * base class for Filters
079       */
080      public static abstract class FilterBase extends org.apache.hadoop.mapreduce.
081          lib.input.SequenceFileInputFilter.FilterBase
082          implements Filter {
083      }
084        
085      /** Records filter by matching key to regex
086       */
087      public static class RegexFilter extends FilterBase {
088        org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
089          RegexFilter rf;
090        public static void setPattern(Configuration conf, String regex)
091            throws PatternSyntaxException {
092          org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
093            RegexFilter.setPattern(conf, regex);
094        }
095            
096        public RegexFilter() { 
097          rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
098                 RegexFilter();
099        }
100            
101        /** configure the Filter by checking the configuration
102         */
103        public void setConf(Configuration conf) {
104          rf.setConf(conf);
105        }
106    
107    
108        /** Filtering method
109         * If key matches the regex, return true; otherwise return false
110         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
111         */
112        public boolean accept(Object key) {
113          return rf.accept(key);
114        }
115      }
116    
117      /** This class returns a percentage of records
118       * The percentage is determined by a filtering frequency <i>f</i> using
119       * the criteria record# % f == 0.
120       * For example, if the frequency is 10, one out of 10 records is returned.
121       */
122      public static class PercentFilter extends FilterBase {
123        org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
124                  PercentFilter pf;
125        /** set the frequency and stores it in conf
126         * @param conf configuration
127         * @param frequency filtering frequencey
128         */
129        public static void setFrequency(Configuration conf, int frequency) {
130           org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
131                  PercentFilter.setFrequency(conf, frequency);
132        }
133                    
134        public PercentFilter() { 
135          pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.
136            PercentFilter();
137        }
138                    
139        /** configure the filter by checking the configuration
140         * 
141         * @param conf configuration
142         */
143        public void setConf(Configuration conf) {
144          pf.setConf(conf);
145        }
146    
147        /** Filtering method
148         * If record# % frequency==0, return true; otherwise return false
149         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
150         */
151        public boolean accept(Object key) {
152          return pf.accept(key);
153        }
154      }
155    
156      /** This class returns a set of records by examing the MD5 digest of its
157       * key against a filtering frequency <i>f</i>. The filtering criteria is
158       * MD5(key) % f == 0.
159       */
160      public static class MD5Filter extends FilterBase {
161        public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib.
162          input.SequenceFileInputFilter.MD5Filter.MD5_LEN;
163        org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf;
164        /** set the filtering frequency in configuration
165         * 
166         * @param conf configuration
167         * @param frequency filtering frequency
168         */
169        public static void setFrequency(Configuration conf, int frequency) {
170          org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter.
171            setFrequency(conf, frequency);
172        }
173            
174        public MD5Filter() { 
175          mf = new org.apache.hadoop.mapreduce.lib.input.
176            SequenceFileInputFilter.MD5Filter();
177        }
178            
179        /** configure the filter according to configuration
180         * 
181         * @param conf configuration
182         */
183        public void setConf(Configuration conf) {
184          mf.setConf(conf);
185        }
186    
187        /** Filtering method
188         * If MD5(key) % frequency==0, return true; otherwise return false
189         * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
190         */
191        public boolean accept(Object key) {
192          return mf.accept(key);
193        }
194      }
195        
196      private static class FilterRecordReader<K, V>
197        extends SequenceFileRecordReader<K, V> {
198        
199        private Filter filter;
200            
201        public FilterRecordReader(Configuration conf, FileSplit split)
202          throws IOException {
203          super(conf, split);
204          // instantiate filter
205          filter = (Filter)ReflectionUtils.newInstance(
206                                                       conf.getClass(FILTER_CLASS, PercentFilter.class), 
207                                                       conf);
208        }
209            
210        public synchronized boolean next(K key, V value) throws IOException {
211          while (next(key)) {
212            if (filter.accept(key)) {
213              getCurrentValue(value);
214              return true;
215            }
216          }
217                
218          return false;
219        }
220      }
221    }