001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.fieldsel; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.List; 024 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 import org.apache.hadoop.classification.InterfaceAudience; 028 import org.apache.hadoop.classification.InterfaceStability; 029 import org.apache.hadoop.conf.Configuration; 030 import org.apache.hadoop.io.Text; 031 import org.apache.hadoop.mapreduce.Reducer; 032 033 /** 034 * This class implements a reducer class that can be used to perform field 035 * selections in a manner similar to unix cut. 036 * 037 * The input data is treated as fields separated by a user specified 038 * separator (the default value is "\t"). The user can specify a list of 039 * fields that form the reduce output keys, and a list of fields that form 040 * the reduce output values. The fields are the union of those from the key 041 * and those from the value. 042 * 043 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator" 044 * 045 * The reduce output field list spec is under attribute 046 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". 047 * The value is expected to be like 048 * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) 049 * separated field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec 050 * can be a simple number (e.g. 5) specifying a specific field, or a range 051 * (like 2-5) to specify a range of fields, or an open range (like 3-) 052 * specifying all the fields starting from field 3. The open range field 053 * spec applies value fields only. They have no effect on the key fields. 054 * 055 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 056 * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values. 057 */ 058 @InterfaceAudience.Public 059 @InterfaceStability.Stable 060 public class FieldSelectionReducer<K, V> 061 extends Reducer<Text, Text, Text, Text> { 062 063 private String fieldSeparator = "\t"; 064 065 private String reduceOutputKeyValueSpec; 066 067 private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>(); 068 069 private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>(); 070 071 private int allReduceValueFieldsFrom = -1; 072 073 public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce"); 074 075 public void setup(Context context) 076 throws IOException, InterruptedException { 077 Configuration conf = context.getConfiguration(); 078 079 this.fieldSeparator = 080 conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t"); 081 082 this.reduceOutputKeyValueSpec = 083 conf.get(FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:"); 084 085 allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec( 086 reduceOutputKeyValueSpec, reduceOutputKeyFieldList, 087 reduceOutputValueFieldList); 088 089 LOG.info(FieldSelectionHelper.specToString(fieldSeparator, 090 reduceOutputKeyValueSpec, allReduceValueFieldsFrom, 091 reduceOutputKeyFieldList, reduceOutputValueFieldList)); 092 } 093 094 public void reduce(Text key, Iterable<Text> values, Context context) 095 throws IOException, InterruptedException { 096 String keyStr = key.toString() + this.fieldSeparator; 097 098 for (Text val : values) { 099 FieldSelectionHelper helper = new FieldSelectionHelper(); 100 helper.extractOutputKeyValue(keyStr, val.toString(), 101 fieldSeparator, reduceOutputKeyFieldList, 102 reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false); 103 context.write(helper.getKey(), helper.getValue()); 104 } 105 } 106 }