001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.List;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.io.Text;
031    import org.apache.hadoop.mapred.JobConf;
032    import org.apache.hadoop.mapred.Mapper;
033    import org.apache.hadoop.mapred.OutputCollector;
034    import org.apache.hadoop.mapred.Reducer;
035    import org.apache.hadoop.mapred.Reporter;
036    import org.apache.hadoop.mapred.TextInputFormat;
037    import org.apache.hadoop.mapreduce.lib.fieldsel.*;
038    
039    /**
040     * This class implements a mapper/reducer class that can be used to perform
041     * field selections in a manner similar to unix cut. The input data is treated
042     * as fields separated by a user specified separator (the default value is
043     * "\t"). The user can specify a list of fields that form the map output keys,
044     * and a list of fields that form the map output values. If the inputformat is
045     * TextInputFormat, the mapper will ignore the key to the map function. and the
046     * fields are from the value only. Otherwise, the fields are the union of those
047     * from the key and those from the value.
048     * 
049     * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
050     * 
051     * The map output field list spec is under attribute 
052     * "mapreduce.fieldsel.map.output.key.value.fields.spec".
053     * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
054     * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
055     * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
056     * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
057     * the fields starting from field 3. The open range field spec applies value fields only.
058     * They have no effect on the key fields.
059     * 
060     * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
061     * and use fields 6,5,1,2,3,7 and above for values.
062     * 
063     * The reduce output field list spec is under attribute 
064     * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
065     * 
066     * The reducer extracts output key/value pairs in a similar manner, except that
067     * the key is never ignored.
068     */
069    @InterfaceAudience.Public
070    @InterfaceStability.Stable
071    public class FieldSelectionMapReduce<K, V>
072        implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
073    
074      private String mapOutputKeyValueSpec;
075    
076      private boolean ignoreInputKey;
077    
078      private String fieldSeparator = "\t";
079    
080      private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
081    
082      private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
083    
084      private int allMapValueFieldsFrom = -1;
085    
086      private String reduceOutputKeyValueSpec;
087    
088      private List<Integer> reduceOutputKeyFieldList = new ArrayList<Integer>();
089    
090      private List<Integer> reduceOutputValueFieldList = new ArrayList<Integer>();
091    
092      private int allReduceValueFieldsFrom = -1;
093    
094    
095      public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
096    
097      private String specToString() {
098        StringBuffer sb = new StringBuffer();
099        sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
100    
101        sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append(
102            "\n");
103        sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec)
104            .append("\n");
105    
106        sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append(
107            "\n");
108    
109        sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom)
110            .append("\n");
111    
112        int i = 0;
113    
114        sb.append("mapOutputKeyFieldList.length: ").append(
115            mapOutputKeyFieldList.size()).append("\n");
116        for (i = 0; i < mapOutputKeyFieldList.size(); i++) {
117          sb.append("\t").append(mapOutputKeyFieldList.get(i)).append("\n");
118        }
119        sb.append("mapOutputValueFieldList.length: ").append(
120            mapOutputValueFieldList.size()).append("\n");
121        for (i = 0; i < mapOutputValueFieldList.size(); i++) {
122          sb.append("\t").append(mapOutputValueFieldList.get(i)).append("\n");
123        }
124    
125        sb.append("reduceOutputKeyFieldList.length: ").append(
126            reduceOutputKeyFieldList.size()).append("\n");
127        for (i = 0; i < reduceOutputKeyFieldList.size(); i++) {
128          sb.append("\t").append(reduceOutputKeyFieldList.get(i)).append("\n");
129        }
130        sb.append("reduceOutputValueFieldList.length: ").append(
131            reduceOutputValueFieldList.size()).append("\n");
132        for (i = 0; i < reduceOutputValueFieldList.size(); i++) {
133          sb.append("\t").append(reduceOutputValueFieldList.get(i)).append("\n");
134        }
135        return sb.toString();
136      }
137    
138      /**
139       * The identify function. Input key/value pair is written directly to output.
140       */
141      public void map(K key, V val,
142          OutputCollector<Text, Text> output, Reporter reporter) 
143          throws IOException {
144        FieldSelectionHelper helper = new FieldSelectionHelper(
145          FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
146        helper.extractOutputKeyValue(key.toString(), val.toString(),
147          fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
148          allMapValueFieldsFrom, ignoreInputKey, true);
149        output.collect(helper.getKey(), helper.getValue());
150      }
151    
152      private void parseOutputKeyValueSpec() {
153        allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
154          mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
155        
156        allReduceValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
157          reduceOutputKeyValueSpec, reduceOutputKeyFieldList,
158          reduceOutputValueFieldList);
159      }
160    
161      public void configure(JobConf job) {
162        this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR,
163            "\t");
164        this.mapOutputKeyValueSpec = job.get(
165            FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
166        this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
167            job.getInputFormat().getClass().getCanonicalName());
168        this.reduceOutputKeyValueSpec = job.get(
169            FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:");
170        parseOutputKeyValueSpec();
171        LOG.info(specToString());
172      }
173    
174      public void close() throws IOException {
175        // TODO Auto-generated method stub
176    
177      }
178    
179      public void reduce(Text key, Iterator<Text> values,
180                         OutputCollector<Text, Text> output, Reporter reporter)
181        throws IOException {
182        String keyStr = key.toString() + this.fieldSeparator;
183        while (values.hasNext()) {
184            FieldSelectionHelper helper = new FieldSelectionHelper();
185            helper.extractOutputKeyValue(keyStr, values.next().toString(),
186              fieldSeparator, reduceOutputKeyFieldList,
187              reduceOutputValueFieldList, allReduceValueFieldsFrom, false, false);
188          output.collect(helper.getKey(), helper.getValue());
189        }
190      }
191    }