001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.fieldsel;
020    
021    import java.util.List;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.io.Text;
026    
027    /**
028     * This class implements a mapper/reducer class that can be used to perform
029     * field selections in a manner similar to unix cut. The input data is treated
030     * as fields separated by a user specified separator (the default value is
031     * "\t"). The user can specify a list of fields that form the map output keys,
032     * and a list of fields that form the map output values. If the inputformat is
033     * TextInputFormat, the mapper will ignore the key to the map function. and the
034     * fields are from the value only. Otherwise, the fields are the union of those
035     * from the key and those from the value.
036     * 
037     * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
038     * 
039     * The map output field list spec is under attribute 
040     * "mapreduce.fieldsel.map.output.key.value.fields.spec".
041     * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
042     * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
043     * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
044     * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
045     * the fields starting from field 3. The open range field spec applies value fields only.
046     * They have no effect on the key fields.
047     * 
048     * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
049     * and use fields 6,5,1,2,3,7 and above for values.
050     * 
051     * The reduce output field list spec is under attribute 
052     * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
053     * 
054     * The reducer extracts output key/value pairs in a similar manner, except that
055     * the key is never ignored.
056     * 
057     */
058    @InterfaceAudience.Public
059    @InterfaceStability.Stable
060    public class FieldSelectionHelper {
061    
062      public static Text emptyText = new Text("");
063      public static final String DATA_FIELD_SEPERATOR = 
064        "mapreduce.fieldsel.data.field.separator";
065      public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 
066        "mapreduce.fieldsel.map.output.key.value.fields.spec";
067      public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 
068        "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
069    
070    
071      /**
072       * Extract the actual field numbers from the given field specs.
073       * If a field spec is in the form of "n-" (like 3-), then n will be the 
074       * return value. Otherwise, -1 will be returned.  
075       * @param fieldListSpec an array of field specs
076       * @param fieldList an array of field numbers extracted from the specs.
077       * @return number n if some field spec is in the form of "n-", -1 otherwise.
078       */
079      private static int extractFields(String[] fieldListSpec,
080          List<Integer> fieldList) {
081        int allFieldsFrom = -1;
082        int i = 0;
083        int j = 0;
084        int pos = -1;
085        String fieldSpec = null;
086        for (i = 0; i < fieldListSpec.length; i++) {
087          fieldSpec = fieldListSpec[i];
088          if (fieldSpec.length() == 0) {
089            continue;
090          }
091          pos = fieldSpec.indexOf('-');
092          if (pos < 0) {
093            Integer fn = new Integer(fieldSpec);
094            fieldList.add(fn);
095          } else {
096            String start = fieldSpec.substring(0, pos);
097            String end = fieldSpec.substring(pos + 1);
098            if (start.length() == 0) {
099              start = "0";
100            }
101            if (end.length() == 0) {
102              allFieldsFrom = Integer.parseInt(start);
103              continue;
104            }
105            int startPos = Integer.parseInt(start);
106            int endPos = Integer.parseInt(end);
107            for (j = startPos; j <= endPos; j++) {
108              fieldList.add(j);
109            }
110          }
111        }
112        return allFieldsFrom;
113      }
114    
115      private static String selectFields(String[] fields, List<Integer> fieldList,
116          int allFieldsFrom, String separator) {
117        String retv = null;
118        int i = 0;
119        StringBuffer sb = null;
120        if (fieldList != null && fieldList.size() > 0) {
121          if (sb == null) {
122            sb = new StringBuffer();
123          }
124          for (Integer index : fieldList) {
125            if (index < fields.length) {
126              sb.append(fields[index]);
127            }
128            sb.append(separator);
129          }
130        }
131        if (allFieldsFrom >= 0) {
132          if (sb == null) {
133            sb = new StringBuffer();
134          }
135          for (i = allFieldsFrom; i < fields.length; i++) {
136            sb.append(fields[i]).append(separator);
137          }
138        }
139        if (sb != null) {
140          retv = sb.toString();
141          if (retv.length() > 0) {
142            retv = retv.substring(0, retv.length() - 1);
143          }
144        }
145        return retv;
146      }
147      
148      public static int parseOutputKeyValueSpec(String keyValueSpec,
149          List<Integer> keyFieldList, List<Integer> valueFieldList) {
150        String[] keyValSpecs = keyValueSpec.split(":", -1);
151        
152        String[] keySpec = keyValSpecs[0].split(",");
153        
154        String[] valSpec = new String[0];
155        if (keyValSpecs.length > 1) {
156          valSpec = keyValSpecs[1].split(",");
157        }
158    
159        FieldSelectionHelper.extractFields(keySpec, keyFieldList);
160        return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
161      }
162    
163      public static String specToString(String fieldSeparator, String keyValueSpec,
164          int allValueFieldsFrom, List<Integer> keyFieldList,
165          List<Integer> valueFieldList) {
166        StringBuffer sb = new StringBuffer();
167        sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
168    
169        sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
170        sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
171        sb.append("\n");
172        sb.append("keyFieldList.length: ").append(keyFieldList.size());
173        sb.append("\n");
174        for (Integer field : keyFieldList) {
175          sb.append("\t").append(field).append("\n");
176        }
177        sb.append("valueFieldList.length: ").append(valueFieldList.size());
178        sb.append("\n");
179        for (Integer field : valueFieldList) {
180          sb.append("\t").append(field).append("\n");
181        }
182        return sb.toString();
183      }
184    
185      private Text key = null;
186      private Text value = null;
187      
188      public FieldSelectionHelper() {
189      }
190    
191      public FieldSelectionHelper(Text key, Text val) {
192        this.key = key;
193        this.value = val;
194      }
195      
196      public Text getKey() {
197        return key;
198      }
199     
200      public Text getValue() {
201        return value;
202      }
203    
204      public void extractOutputKeyValue(String key, String val,
205          String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
206          int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
207        if (!ignoreKey) {
208          val = key + val;
209        }
210        String[] fields = val.split(fieldSep);
211        
212        String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
213        String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
214          fieldSep);
215        if (isMap && newKey == null) {
216          newKey = newVal;
217          newVal = null;
218        }
219        
220        if (newKey != null) {
221          this.key = new Text(newKey);
222        }
223        if (newVal != null) {
224          this.value = new Text(newVal);
225        }
226      }
227    }