001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018     
019    package org.apache.hadoop.mapreduce.lib.input;
020    
021    import java.io.IOException;
022    import java.nio.ByteBuffer;
023    import java.security.DigestException;
024    import java.security.MessageDigest;
025    import java.security.NoSuchAlgorithmException;
026    import java.util.regex.Pattern;
027    import java.util.regex.PatternSyntaxException;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.hadoop.classification.InterfaceAudience;
032    import org.apache.hadoop.classification.InterfaceStability;
033    import org.apache.hadoop.conf.Configurable;
034    import org.apache.hadoop.conf.Configuration;
035    import org.apache.hadoop.io.BytesWritable;
036    import org.apache.hadoop.io.Text;
037    import org.apache.hadoop.mapreduce.InputSplit;
038    import org.apache.hadoop.mapreduce.Job;
039    import org.apache.hadoop.mapreduce.RecordReader;
040    import org.apache.hadoop.mapreduce.TaskAttemptContext;
041    import org.apache.hadoop.util.ReflectionUtils;
042    
043    /**
044     * A class that allows a map/red job to work on a sample of sequence files.
045     * The sample is decided by the filter class set by the job.
046     */
047    @InterfaceAudience.Public
048    @InterfaceStability.Stable
049    public class SequenceFileInputFilter<K, V>
050        extends SequenceFileInputFormat<K, V> {
051      public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
052      
053      final public static String FILTER_CLASS = 
054        "mapreduce.input.sequencefileinputfilter.class";
055      final public static String FILTER_FREQUENCY = 
056        "mapreduce.input.sequencefileinputfilter.frequency";
057      final public static String FILTER_REGEX = 
058        "mapreduce.input.sequencefileinputfilter.regex";
059        
060      public SequenceFileInputFilter() {
061      }
062        
063      /** Create a record reader for the given split
064       * @param split file split
065       * @param context the task-attempt context
066       * @return RecordReader
067       */
068      public RecordReader<K, V> createRecordReader(InputSplit split,
069          TaskAttemptContext context) throws IOException {
070        context.setStatus(split.toString());
071        return new FilterRecordReader<K, V>(context.getConfiguration());
072      }
073    
074    
075      /** set the filter class
076       * 
077       * @param job The job
078       * @param filterClass filter class
079       */
080      public static void setFilterClass(Job job, Class<?> filterClass) {
081        job.getConfiguration().set(FILTER_CLASS, filterClass.getName());
082      }
083    
084             
085      /**
086       * filter interface
087       */
088      public interface Filter extends Configurable {
089        /** filter function
090         * Decide if a record should be filtered or not
091         * @param key record key
092         * @return true if a record is accepted; return false otherwise
093         */
094        public abstract boolean accept(Object key);
095      }
096        
097      /**
098       * base class for Filters
099       */
100      public static abstract class FilterBase implements Filter {
101        Configuration conf;
102            
103        public Configuration getConf() {
104          return conf;
105        }
106      }
107        
108      /** Records filter by matching key to regex
109       */
110      public static class RegexFilter extends FilterBase {
111        private Pattern p;
112        /** Define the filtering regex and stores it in conf
113         * @param conf where the regex is set
114         * @param regex regex used as a filter
115         */
116        public static void setPattern(Configuration conf, String regex)
117            throws PatternSyntaxException {
118          try {
119            Pattern.compile(regex);
120          } catch (PatternSyntaxException e) {
121            throw new IllegalArgumentException("Invalid pattern: "+regex);
122          }
123          conf.set(FILTER_REGEX, regex);
124        }
125            
126        public RegexFilter() { }
127            
128        /** configure the Filter by checking the configuration
129         */
130        public void setConf(Configuration conf) {
131          String regex = conf.get(FILTER_REGEX);
132          if (regex == null)
133            throw new RuntimeException(FILTER_REGEX + "not set");
134          this.p = Pattern.compile(regex);
135          this.conf = conf;
136        }
137    
138    
139        /** Filtering method
140         * If key matches the regex, return true; otherwise return false
141         * @see Filter#accept(Object)
142         */
143        public boolean accept(Object key) {
144          return p.matcher(key.toString()).matches();
145        }
146      }
147    
148      /** This class returns a percentage of records
149       * The percentage is determined by a filtering frequency <i>f</i> using
150       * the criteria record# % f == 0.
151       * For example, if the frequency is 10, one out of 10 records is returned.
152       */
153      public static class PercentFilter extends FilterBase {
154        private int frequency;
155        private int count;
156    
157        /** set the frequency and stores it in conf
158         * @param conf configuration
159         * @param frequency filtering frequencey
160         */
161        public static void setFrequency(Configuration conf, int frequency) {
162          if (frequency <= 0)
163            throw new IllegalArgumentException(
164              "Negative " + FILTER_FREQUENCY + ": " + frequency);
165          conf.setInt(FILTER_FREQUENCY, frequency);
166        }
167            
168        public PercentFilter() { }
169            
170        /** configure the filter by checking the configuration
171         * 
172         * @param conf configuration
173         */
174        public void setConf(Configuration conf) {
175          this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
176          if (this.frequency <= 0) {
177            throw new RuntimeException(
178              "Negative "+FILTER_FREQUENCY + ": " + this.frequency);
179          }
180          this.conf = conf;
181        }
182    
183        /** Filtering method
184         * If record# % frequency==0, return true; otherwise return false
185         * @see Filter#accept(Object)
186         */
187        public boolean accept(Object key) {
188          boolean accepted = false;
189          if (count == 0)
190            accepted = true;
191          if (++count == frequency) {
192            count = 0;
193          }
194          return accepted;
195        }
196      }
197    
198      /** This class returns a set of records by examing the MD5 digest of its
199       * key against a filtering frequency <i>f</i>. The filtering criteria is
200       * MD5(key) % f == 0.
201       */
202      public static class MD5Filter extends FilterBase {
203        private int frequency;
204        private static final MessageDigest DIGESTER;
205        public static final int MD5_LEN = 16;
206        private byte [] digest = new byte[MD5_LEN];
207            
208        static {
209          try {
210            DIGESTER = MessageDigest.getInstance("MD5");
211          } catch (NoSuchAlgorithmException e) {
212            throw new RuntimeException(e);
213          }
214        }
215    
216    
217        /** set the filtering frequency in configuration
218         * 
219         * @param conf configuration
220         * @param frequency filtering frequency
221         */
222        public static void setFrequency(Configuration conf, int frequency) {
223          if (frequency <= 0)
224            throw new IllegalArgumentException(
225              "Negative " + FILTER_FREQUENCY + ": " + frequency);
226          conf.setInt(FILTER_FREQUENCY, frequency);
227        }
228            
229        public MD5Filter() { }
230            
231        /** configure the filter according to configuration
232         * 
233         * @param conf configuration
234         */
235        public void setConf(Configuration conf) {
236          this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
237          if (this.frequency <= 0) {
238            throw new RuntimeException(
239              "Negative " + FILTER_FREQUENCY + ": " + this.frequency);
240          }
241          this.conf = conf;
242        }
243    
244        /** Filtering method
245         * If MD5(key) % frequency==0, return true; otherwise return false
246         * @see Filter#accept(Object)
247         */
248        public boolean accept(Object key) {
249          try {
250            long hashcode;
251            if (key instanceof Text) {
252              hashcode = MD5Hashcode((Text)key);
253            } else if (key instanceof BytesWritable) {
254              hashcode = MD5Hashcode((BytesWritable)key);
255            } else {
256              ByteBuffer bb;
257              bb = Text.encode(key.toString());
258              hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
259            }
260            if (hashcode / frequency * frequency == hashcode)
261              return true;
262          } catch(Exception e) {
263            LOG.warn(e);
264            throw new RuntimeException(e);
265          }
266          return false;
267        }
268            
269        private long MD5Hashcode(Text key) throws DigestException {
270          return MD5Hashcode(key.getBytes(), 0, key.getLength());
271        }
272            
273        private long MD5Hashcode(BytesWritable key) throws DigestException {
274          return MD5Hashcode(key.getBytes(), 0, key.getLength());
275        }
276        
277        synchronized private long MD5Hashcode(byte[] bytes, 
278            int start, int length) throws DigestException {
279          DIGESTER.update(bytes, 0, length);
280          DIGESTER.digest(digest, 0, MD5_LEN);
281          long hashcode=0;
282          for (int i = 0; i < 8; i++)
283            hashcode |= ((digest[i] & 0xffL) << (8 * (7 - i)));
284          return hashcode;
285        }
286      }
287        
288      private static class FilterRecordReader<K, V>
289          extends SequenceFileRecordReader<K, V> {
290        
291        private Filter filter;
292        private K key;
293        private V value;
294            
295        public FilterRecordReader(Configuration conf)
296            throws IOException {
297          super();
298          // instantiate filter
299          filter = (Filter)ReflectionUtils.newInstance(
300            conf.getClass(FILTER_CLASS, PercentFilter.class), conf);
301        }
302        
303        public synchronized boolean nextKeyValue() 
304            throws IOException, InterruptedException {
305          while (super.nextKeyValue()) {
306            key = super.getCurrentKey();
307            if (filter.accept(key)) {
308              value = super.getCurrentValue();
309              return true;
310            }
311          }
312          return false;
313        }
314        
315        @Override
316        public K getCurrentKey() {
317          return key;
318        }
319        
320        @Override
321        public V getCurrentValue() {
322          return value;
323        }
324      }
325    }