001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapred.lib;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.io.Text;
031import org.apache.hadoop.mapred.JobConf;
032import org.apache.hadoop.mapred.Mapper;
033import org.apache.hadoop.mapred.OutputCollector;
034import org.apache.hadoop.mapred.Reducer;
035import org.apache.hadoop.mapred.Reporter;
036import org.apache.hadoop.mapred.TextInputFormat;
037import org.apache.hadoop.mapreduce.lib.fieldsel.*;
038
039/**
040 * This class implements a mapper/reducer class that can be used to perform
041 * field selections in a manner similar to unix cut. The input data is treated
042 * as fields separated by a user specified separator (the default value is
043 * "\t"). The user can specify a list of fields that form the map output keys,
044 * and a list of fields that form the map output values. If the inputformat is
045 * TextInputFormat, the mapper will ignore the key to the map function. and the
046 * fields are from the value only. Otherwise, the fields are the union of those
047 * from the key and those from the value.
048 * 
049 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
050 * 
051 * The map output field list spec is under attribute 
052 * "mapreduce.fieldsel.map.output.key.value.fields.spec".
053 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
054 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
055 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
056 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
057 * the fields starting from field 3. The open range field spec applies value fields only.
058 * They have no effect on the key fields.
059 * 
060 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
061 * and use fields 6,5,1,2,3,7 and above for values.
062 * 
063 * The reduce output field list spec is under attribute 
064 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
065 * 
066 * The reducer extracts output key/value pairs in a similar manner, except that
067 * the key is never ignored.
068 */
069@InterfaceAudience.Public
070@InterfaceStability.Stable
071public class FieldSelectionMapReduce<K, V>
072    implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
073
074  private String mapOutputKeyValueSpec;
075
076  private boolean ignoreInputKey;
077
078  private String fieldSeparator = "\t";
079
080  private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
081
082  private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
083
084  private int allMapValueFieldsFrom = -1;
085
086  private String reduceOutputKeyValueSpec;
087
088  private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
089
090  private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
091
092  private int allReduceValueFieldsFrom = -1;
093
094
095  public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
096
097  private String specToString() {
098    StringBuffer sb = new StringBuffer();
099    sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
100
101    sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append(
102        "\n");
103    sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec)
104        .append("\n");
105
106    sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append(
107        "\n");
108
109    sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom)
110        .append("\n");
111
112    int i = 0;
113
114    sb.append("mapOutputKeyFieldList.length: ").append(
115        mapOutputKeyFieldList.size()).append("\n");
116    for (i = 0; i < mapOutputKeyFieldList.size(); i++) {
117      sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n");
118    }
119    sb.append("mapOutputValueFieldList.length: ").append(
120        mapOutputValueFieldList.size()).append("\n");
121    for (i = 0; i < mapOutputValueFieldList.size(); i++) {
122      sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n");
123    }
124
125    sb.append("reduceOutputKeyFieldList.length: ").append(
126        reduceOutputKeyFieldList.size()).append("\n");
127    for (i = 0; i < reduceOutputKeyFieldList.size(); i++) {
128      sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n");
129    }
130    sb.append("reduceOutputValueFieldList.length: ").append(
131        reduceOutputValueFieldList.size()).append("\n");
132    for (i = 0; i < reduceOutputValueFieldList.size(); i++) {
133      sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n");
134    }
135    return sb.toString();
136  }
137
138  /**
139   * The identify function. Input key/value pair is written directly to output.
140   */
141  public void map(K key, V val,
142      OutputCollector<Text, Text> output, Reporter reporter) 
143      throws IOException {
144    FieldSelectionHelper helper = new FieldSelectionHelper(
145      FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
146    helper.extractOutputKeyValue(key.toString(), val.toString(),
147      fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
148      allMapValueFieldsFrom, ignoreInputKey, true);
149    output.collect(helper.getKey(), helper.getValue());
150  }
151
152  private void parseOutputKeyValueSpec() {
153    allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
154      mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
155    
156    allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
157      reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
158      reduceOutputValueFieldList);
159  }
160
161  public void configure(JobConf job) {
162    this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR,
163        "\t");
164    this.mapOutputKeyValueSpec = job.get(
165        FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
166    this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
167        job.getInputFormat().getClass().getCanonicalName());
168    this.reduceOutputKeyValueSpec = job.get(
169        FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
170    parseOutputKeyValueSpec();
171    LOG.info(specToString());
172  }
173
174  public void close() throws IOException {
175    // TODO Auto-generated method stub
176
177  }
178
179  public void reduce(Text key, Iterator<Text> values,
180                     OutputCollector<Text, Text> output, Reporter reporter)
181    throws IOException {
182    String keyStr = key.toString() + this.fieldSeparator;
183    while (values.hasNext()) {
184        FieldSelectionHelper helper = new FieldSelectionHelper();
185        helper.extractOutputKeyValue(keyStr, values.next().toString(),
186          fieldSeparator, reduceOutputKeyFieldList,
187          reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
188      output.collect(helper.getKey(), helper.getValue());
189    }
190  }
191}