001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib;
020
021 import java.io.IOException;
022 import java.util.ArrayList;
023 import java.util.Iterator;
024 import java.util.List;
025
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.io.Text;
031 import org.apache.hadoop.mapred.JobConf;
032 import org.apache.hadoop.mapred.Mapper;
033 import org.apache.hadoop.mapred.OutputCollector;
034 import org.apache.hadoop.mapred.Reducer;
035 import org.apache.hadoop.mapred.Reporter;
036 import org.apache.hadoop.mapred.TextInputFormat;
037 import org.apache.hadoop.mapreduce.lib.fieldsel.*;
038
039 /**
040 * This class implements a mapper/reducer class that can be used to perform
041 * field selections in a manner similar to unix cut. The input data is treated
042 * as fields separated by a user specified separator (the default value is
043 * "\t"). The user can specify a list of fields that form the map output keys,
044 * and a list of fields that form the map output values. If the inputformat is
045 * TextInputFormat, the mapper will ignore the key to the map function. and the
046 * fields are from the value only. Otherwise, the fields are the union of those
047 * from the key and those from the value.
048 *
049 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
050 *
051 * The map output field list spec is under attribute
052 * "mapreduce.fieldsel.map.output.key.value.fields.spec".
053 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
054 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
055 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
056 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all
057 * the fields starting from field 3. The open range field spec applies value fields only.
058 * They have no effect on the key fields.
059 *
060 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
061 * and use fields 6,5,1,2,3,7 and above for values.
062 *
063 * The reduce output field list spec is under attribute
064 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
065 *
066 * The reducer extracts output key/value pairs in a similar manner, except that
067 * the key is never ignored.
068 */
069 @InterfaceAudience.Public
070 @InterfaceStability.Stable
071 public class FieldSelectionMapReduce<K, V>
072 implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
073
074 private String mapOutputKeyValueSpec;
075
076 private boolean ignoreInputKey;
077
078 private String fieldSeparator = "\t";
079
080 private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
081
082 private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
083
084 private int allMapValueFieldsFrom = -1;
085
086 private String reduceOutputKeyValueSpec;
087
088 private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
089
090 private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
091
092 private int allReduceValueFieldsFrom = -1;
093
094
095 public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
096
097 private String specToString() {
098 StringBuffer sb = new StringBuffer();
099 sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
100
101 sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append(
102 "\n");
103 sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec)
104 .append("\n");
105
106 sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append(
107 "\n");
108
109 sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom)
110 .append("\n");
111
112 int i = 0;
113
114 sb.append("mapOutputKeyFieldList.length: ").append(
115 mapOutputKeyFieldList.size()).append("\n");
116 for (i = 0; i < mapOutputKeyFieldList.size(); i++) {
117 sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n");
118 }
119 sb.append("mapOutputValueFieldList.length: ").append(
120 mapOutputValueFieldList.size()).append("\n");
121 for (i = 0; i < mapOutputValueFieldList.size(); i++) {
122 sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n");
123 }
124
125 sb.append("reduceOutputKeyFieldList.length: ").append(
126 reduceOutputKeyFieldList.size()).append("\n");
127 for (i = 0; i < reduceOutputKeyFieldList.size(); i++) {
128 sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n");
129 }
130 sb.append("reduceOutputValueFieldList.length: ").append(
131 reduceOutputValueFieldList.size()).append("\n");
132 for (i = 0; i < reduceOutputValueFieldList.size(); i++) {
133 sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n");
134 }
135 return sb.toString();
136 }
137
138 /**
139 * The identify function. Input key/value pair is written directly to output.
140 */
141 public void map(K key, V val,
142 OutputCollector<Text, Text> output, Reporter reporter)
143 throws IOException {
144 FieldSelectionHelper helper = new FieldSelectionHelper(
145 FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
146 helper.extractOutputKeyValue(key.toString(), val.toString(),
147 fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
148 allMapValueFieldsFrom, ignoreInputKey, true);
149 output.collect(helper.getKey(), helper.getValue());
150 }
151
152 private void parseOutputKeyValueSpec() {
153 allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
154 mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
155
156 allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
157 reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
158 reduceOutputValueFieldList);
159 }
160
161 public void configure(JobConf job) {
162 this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR,
163 "\t");
164 this.mapOutputKeyValueSpec = job.get(
165 FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
166 this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
167 job.getInputFormat().getClass().getCanonicalName());
168 this.reduceOutputKeyValueSpec = job.get(
169 FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
170 parseOutputKeyValueSpec();
171 LOG.info(specToString());
172 }
173
174 public void close() throws IOException {
175 // TODO Auto-generated method stub
176
177 }
178
179 public void reduce(Text key, Iterator<Text> values,
180 OutputCollector<Text, Text> output, Reporter reporter)
181 throws IOException {
182 String keyStr = key.toString() + this.fieldSeparator;
183 while (values.hasNext()) {
184 FieldSelectionHelper helper = new FieldSelectionHelper();
185 helper.extractOutputKeyValue(keyStr, values.next().toString(),
186 fieldSeparator, reduceOutputKeyFieldList,
187 reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
188 output.collect(helper.getKey(), helper.getValue());
189 }
190 }
191 }