001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapred; 020 021import java.io.IOException; 022import java.util.regex.PatternSyntaxException; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.util.ReflectionUtils; 028 029/** 030 * A class that allows a map/red job to work on a sample of sequence files. 031 * The sample is decided by the filter class set by the job. 032 */ 033@InterfaceAudience.Public 034@InterfaceStability.Stable 035public class SequenceFileInputFilter<K, V> 036 extends SequenceFileInputFormat<K, V> { 037 038 final private static String FILTER_CLASS = org.apache.hadoop.mapreduce.lib. 039 input.SequenceFileInputFilter.FILTER_CLASS; 040 041 public SequenceFileInputFilter() { 042 } 043 044 /** Create a record reader for the given split 045 * @param split file split 046 * @param job job configuration 047 * @param reporter reporter who sends report to task tracker 048 * @return RecordReader 049 */ 050 public RecordReader<K, V> getRecordReader(InputSplit split, 051 JobConf job, Reporter reporter) 052 throws IOException { 053 054 reporter.setStatus(split.toString()); 055 056 return new FilterRecordReader<K, V>(job, (FileSplit) split); 057 } 058 059 060 /** set the filter class 061 * 062 * @param conf application configuration 063 * @param filterClass filter class 064 */ 065 public static void setFilterClass(Configuration conf, Class filterClass) { 066 conf.set(FILTER_CLASS, filterClass.getName()); 067 } 068 069 070 /** 071 * filter interface 072 */ 073 public interface Filter extends 074 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.Filter { 075 } 076 077 /** 078 * base class for Filters 079 */ 080 public static abstract class FilterBase extends org.apache.hadoop.mapreduce. 081 lib.input.SequenceFileInputFilter.FilterBase 082 implements Filter { 083 } 084 085 /** Records filter by matching key to regex 086 */ 087 public static class RegexFilter extends FilterBase { 088 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 089 RegexFilter rf; 090 public static void setPattern(Configuration conf, String regex) 091 throws PatternSyntaxException { 092 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 093 RegexFilter.setPattern(conf, regex); 094 } 095 096 public RegexFilter() { 097 rf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 098 RegexFilter(); 099 } 100 101 /** configure the Filter by checking the configuration 102 */ 103 public void setConf(Configuration conf) { 104 rf.setConf(conf); 105 } 106 107 108 /** Filtering method 109 * If key matches the regex, return true; otherwise return false 110 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object) 111 */ 112 public boolean accept(Object key) { 113 return rf.accept(key); 114 } 115 } 116 117 /** This class returns a percentage of records 118 * The percentage is determined by a filtering frequency <i>f</i> using 119 * the criteria record# % f == 0. 120 * For example, if the frequency is 10, one out of 10 records is returned. 121 */ 122 public static class PercentFilter extends FilterBase { 123 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 124 PercentFilter pf; 125 /** set the frequency and stores it in conf 126 * @param conf configuration 127 * @param frequency filtering frequencey 128 */ 129 public static void setFrequency(Configuration conf, int frequency) { 130 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 131 PercentFilter.setFrequency(conf, frequency); 132 } 133 134 public PercentFilter() { 135 pf = new org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter. 136 PercentFilter(); 137 } 138 139 /** configure the filter by checking the configuration 140 * 141 * @param conf configuration 142 */ 143 public void setConf(Configuration conf) { 144 pf.setConf(conf); 145 } 146 147 /** Filtering method 148 * If record# % frequency==0, return true; otherwise return false 149 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object) 150 */ 151 public boolean accept(Object key) { 152 return pf.accept(key); 153 } 154 } 155 156 /** This class returns a set of records by examing the MD5 digest of its 157 * key against a filtering frequency <i>f</i>. The filtering criteria is 158 * MD5(key) % f == 0. 159 */ 160 public static class MD5Filter extends FilterBase { 161 public static final int MD5_LEN = org.apache.hadoop.mapreduce.lib. 162 input.SequenceFileInputFilter.MD5Filter.MD5_LEN; 163 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter mf; 164 /** set the filtering frequency in configuration 165 * 166 * @param conf configuration 167 * @param frequency filtering frequency 168 */ 169 public static void setFrequency(Configuration conf, int frequency) { 170 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter.MD5Filter. 171 setFrequency(conf, frequency); 172 } 173 174 public MD5Filter() { 175 mf = new org.apache.hadoop.mapreduce.lib.input. 176 SequenceFileInputFilter.MD5Filter(); 177 } 178 179 /** configure the filter according to configuration 180 * 181 * @param conf configuration 182 */ 183 public void setConf(Configuration conf) { 184 mf.setConf(conf); 185 } 186 187 /** Filtering method 188 * If MD5(key) % frequency==0, return true; otherwise return false 189 * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object) 190 */ 191 public boolean accept(Object key) { 192 return mf.accept(key); 193 } 194 } 195 196 private static class FilterRecordReader<K, V> 197 extends SequenceFileRecordReader<K, V> { 198 199 private Filter filter; 200 201 public FilterRecordReader(Configuration conf, FileSplit split) 202 throws IOException { 203 super(conf, split); 204 // instantiate filter 205 filter = (Filter)ReflectionUtils.newInstance( 206 conf.getClass(FILTER_CLASS, PercentFilter.class), 207 conf); 208 } 209 210 public synchronized boolean next(K key, V value) throws IOException { 211 while (next(key)) { 212 if (filter.accept(key)) { 213 getCurrentValue(value); 214 return true; 215 } 216 } 217 218 return false; 219 } 220 } 221}