001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapred.lib; 020 021 import java.io.IOException; 022 import java.util.ArrayList; 023 import java.util.Iterator; 024 import java.util.List; 025 026 import org.apache.commons.logging.Log; 027 import org.apache.commons.logging.LogFactory; 028 import org.apache.hadoop.classification.InterfaceAudience; 029 import org.apache.hadoop.classification.InterfaceStability; 030 import org.apache.hadoop.io.Text; 031 import org.apache.hadoop.mapred.JobConf; 032 import org.apache.hadoop.mapred.Mapper; 033 import org.apache.hadoop.mapred.OutputCollector; 034 import org.apache.hadoop.mapred.Reducer; 035 import org.apache.hadoop.mapred.Reporter; 036 import org.apache.hadoop.mapred.TextInputFormat; 037 import org.apache.hadoop.mapreduce.lib.fieldsel.*; 038 039 /** 040 * This class implements a mapper/reducer class that can be used to perform 041 * field selections in a manner similar to unix cut. The input data is treated 042 * as fields separated by a user specified separator (the default value is 043 * "\t"). The user can specify a list of fields that form the map output keys, 044 * and a list of fields that form the map output values. If the inputformat is 045 * TextInputFormat, the mapper will ignore the key to the map function. and the 046 * fields are from the value only. Otherwise, the fields are the union of those 047 * from the key and those from the value. 048 * 049 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator" 050 * 051 * The map output field list spec is under attribute 052 * "mapreduce.fieldsel.map.output.key.value.fields.spec". 053 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec" 054 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... 055 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range 056 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 057 * the fields starting from field 3. The open range field spec applies value fields only. 058 * They have no effect on the key fields. 059 * 060 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, 061 * and use fields 6,5,1,2,3,7 and above for values. 062 * 063 * The reduce output field list spec is under attribute 064 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". 065 * 066 * The reducer extracts output key/value pairs in a similar manner, except that 067 * the key is never ignored. 068 */ 069 @InterfaceAudience.Public 070 @InterfaceStability.Stable 071 public class FieldSelectionMapReduce<K, V> 072 implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> { 073 074 private String mapOutputKeyValueSpec; 075 076 private boolean ignoreInputKey; 077 078 private String fieldSeparator = "\t"; 079 080 private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>(); 081 082 private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>(); 083 084 private int allMapValueFieldsFrom = -1; 085 086 private String reduceOutputKeyValueSpec; 087 088 private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>(); 089 090 private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>(); 091 092 private int allReduceValueFieldsFrom = -1; 093 094 095 public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce"); 096 097 private String specToString() { 098 StringBuffer sb = new StringBuffer(); 099 sb.append("fieldSeparator: ").append(fieldSeparator).append("\n"); 100 101 sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append( 102 "\n"); 103 sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec) 104 .append("\n"); 105 106 sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append( 107 "\n"); 108 109 sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom) 110 .append("\n"); 111 112 int i = 0; 113 114 sb.append("mapOutputKeyFieldList.length: ").append( 115 mapOutputKeyFieldList.size()).append("\n"); 116 for (i = 0; i < mapOutputKeyFieldList.size(); i++) { 117 sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n"); 118 } 119 sb.append("mapOutputValueFieldList.length: ").append( 120 mapOutputValueFieldList.size()).append("\n"); 121 for (i = 0; i < mapOutputValueFieldList.size(); i++) { 122 sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n"); 123 } 124 125 sb.append("reduceOutputKeyFieldList.length: ").append( 126 reduceOutputKeyFieldList.size()).append("\n"); 127 for (i = 0; i < reduceOutputKeyFieldList.size(); i++) { 128 sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n"); 129 } 130 sb.append("reduceOutputValueFieldList.length: ").append( 131 reduceOutputValueFieldList.size()).append("\n"); 132 for (i = 0; i < reduceOutputValueFieldList.size(); i++) { 133 sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n"); 134 } 135 return sb.toString(); 136 } 137 138 /** 139 * The identify function. Input key/value pair is written directly to output. 140 */ 141 public void map(K key, V val, 142 OutputCollector<Text, Text> output, Reporter reporter) 143 throws IOException { 144 FieldSelectionHelper helper = new FieldSelectionHelper( 145 FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText); 146 helper.extractOutputKeyValue(key.toString(), val.toString(), 147 fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList, 148 allMapValueFieldsFrom, ignoreInputKey, true); 149 output.collect(helper.getKey(), helper.getValue()); 150 } 151 152 private void parseOutputKeyValueSpec() { 153 allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec( 154 mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList); 155 156 allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec( 157 reduceOutputKeyValueSpec, reduceOutputKeyFieldList, 158 reduceOutputValueFieldList); 159 } 160 161 public void configure(JobConf job) { 162 this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, 163 "\t"); 164 this.mapOutputKeyValueSpec = job.get( 165 FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:"); 166 this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals( 167 job.getInputFormat().getClass().getCanonicalName()); 168 this.reduceOutputKeyValueSpec = job.get( 169 FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:"); 170 parseOutputKeyValueSpec(); 171 LOG.info(specToString()); 172 } 173 174 public void close() throws IOException { 175 // TODO Auto-generated method stub 176 177 } 178 179 public void reduce(Text key, Iterator<Text> values, 180 OutputCollector<Text, Text> output, Reporter reporter) 181 throws IOException { 182 String keyStr = key.toString() + this.fieldSeparator; 183 while (values.hasNext()) { 184 FieldSelectionHelper helper = new FieldSelectionHelper(); 185 helper.extractOutputKeyValue(keyStr, values.next().toString(), 186 fieldSeparator, reduceOutputKeyFieldList, 187 reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false); 188 output.collect(helper.getKey(), helper.getValue()); 189 } 190 } 191 }