001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.fieldsel;
020    
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.List;
024    
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.apache.hadoop.classification.InterfaceAudience;
028    import org.apache.hadoop.classification.InterfaceStability;
029    import org.apache.hadoop.conf.Configuration;
030    import org.apache.hadoop.io.Text;
031    import org.apache.hadoop.mapreduce.Mapper;
032    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
033    
034    /**
035     * This class implements a mapper class that can be used to perform
036     * field selections in a manner similar to unix cut. The input data is treated
037     * as fields separated by a user specified separator (the default value is
038     * "\t"). The user can specify a list of fields that form the map output keys,
039     * and a list of fields that form the map output values. If the inputformat is
040     * TextInputFormat, the mapper will ignore the key to the map function. and the
041     * fields are from the value only. Otherwise, the fields are the union of those
042     * from the key and those from the value.
043     * 
044     * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
045     * 
046     * The map output field list spec is under attribute 
047     * "mapreduce.fieldsel.map.output.key.value.fields.spec". 
048     * The value is expected to be like
049     * "keyFieldsSpec:valueFieldsSpec" key/valueFieldsSpec are comma (,) separated
050     * field spec: fieldSpec,fieldSpec,fieldSpec ... Each field spec can be a 
051     * simple number (e.g. 5) specifying a specific field, or a range (like 2-5)
052     * to specify a range of fields, or an open range (like 3-) specifying all 
053     * the fields starting from field 3. The open range field spec applies value
054     * fields only. They have no effect on the key fields.
055     * 
056     * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields
057     * 4,3,0 and 1 for keys, and use fields 6,5,1,2,3,7 and above for values.
058     */
059    @InterfaceAudience.Public
060    @InterfaceStability.Stable
061    public class FieldSelectionMapper<K, V>
062        extends Mapper<K, V, Text, Text> {
063    
064      private String mapOutputKeyValueSpec;
065    
066      private boolean ignoreInputKey;
067    
068      private String fieldSeparator = "\t";
069    
070      private List<Integer> mapOutputKeyFieldList = new ArrayList<Integer>();
071    
072      private List<Integer> mapOutputValueFieldList = new ArrayList<Integer>();
073    
074      private int allMapValueFieldsFrom = -1;
075    
076      public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
077    
078      public void setup(Context context) 
079          throws IOException, InterruptedException {
080        Configuration conf = context.getConfiguration();
081        this.fieldSeparator = 
082          conf.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, "\t");
083        this.mapOutputKeyValueSpec = 
084          conf.get(FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:");
085        try {
086          this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
087            context.getInputFormatClass().getCanonicalName());
088        } catch (ClassNotFoundException e) {
089          throw new IOException("Input format class not found", e);
090        }
091        allMapValueFieldsFrom = FieldSelectionHelper.parseOutputKeyValueSpec(
092          mapOutputKeyValueSpec, mapOutputKeyFieldList, mapOutputValueFieldList);
093        LOG.info(FieldSelectionHelper.specToString(fieldSeparator,
094          mapOutputKeyValueSpec, allMapValueFieldsFrom, mapOutputKeyFieldList,
095          mapOutputValueFieldList) + "\nignoreInputKey:" + ignoreInputKey);
096      }
097    
098      /**
099       * The identify function. Input key/value pair is written directly to output.
100       */
101      public void map(K key, V val, Context context) 
102          throws IOException, InterruptedException {
103        FieldSelectionHelper helper = new FieldSelectionHelper(
104          FieldSelectionHelper.emptyText, FieldSelectionHelper.emptyText);
105        helper.extractOutputKeyValue(key.toString(), val.toString(),
106          fieldSeparator, mapOutputKeyFieldList, mapOutputValueFieldList,
107          allMapValueFieldsFrom, ignoreInputKey, true);
108        context.write(helper.getKey(), helper.getValue());
109      }
110    }