001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.fieldsel;
020
021import java.util.List;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.io.Text;
026
027/**
028 * This class implements a mapper/reducer class that can be used to perform
029 * field selections in a manner similar to unix cut. The input data is treated
030 * as fields separated by a user specified separator (the default value is
031 * "\t"). The user can specify a list of fields that form the map output keys,
032 * and a list of fields that form the map output values. If the inputformat is
033 * TextInputFormat, the mapper will ignore the key to the map function. and the
034 * fields are from the value only. Otherwise, the fields are the union of those
035 * from the key and those from the value.
036 * 
037 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
038 * 
039 * The map output field list spec is under attribute 
040 * "mapreduce.fieldsel.map.output.key.value.fields.spec".
041 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
042 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
043 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
044 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
045 * the fields starting from field 3. The open range field spec applies value fields only.
046 * They have no effect on the key fields.
047 * 
048 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
049 * and use fields 6,5,1,2,3,7 and above for values.
050 * 
051 * The reduce output field list spec is under attribute 
052 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
053 * 
054 * The reducer extracts output key/value pairs in a similar manner, except that
055 * the key is never ignored.
056 * 
057 */
058@InterfaceAudience.Public
059@InterfaceStability.Stable
060public class FieldSelectionHelper {
061
062  public static Text emptyText = new Text("");
063  public static final String DATA_FIELD_SEPERATOR = 
064    "mapreduce.fieldsel.data.field.separator";
065  public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 
066    "mapreduce.fieldsel.map.output.key.value.fields.spec";
067  public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 
068    "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
069
070
071  /**
072   * Extract the actual field numbers from the given field specs.
073   * If a field spec is in the form of "n-" (like 3-), then n will be the 
074   * return value. Otherwise, -1 will be returned.  
075   * @param fieldListSpec an array of field specs
076   * @param fieldList an array of field numbers extracted from the specs.
077   * @return number n if some field spec is in the form of "n-", -1 otherwise.
078   */
079  private static int extractFields(String[] fieldListSpec,
080      List<Integer> fieldList) {
081    int allFieldsFrom = -1;
082    int i = 0;
083    int j = 0;
084    int pos = -1;
085    String fieldSpec = null;
086    for (i = 0; i < fieldListSpec.length; i++) {
087      fieldSpec = fieldListSpec[i];
088      if (fieldSpec.length() == 0) {
089        continue;
090      }
091      pos = fieldSpec.indexOf('-');
092      if (pos < 0) {
093        Integer fn = Integer.valueOf(fieldSpec);
094        fieldList.add(fn);
095      } else {
096        String start = fieldSpec.substring(0, pos);
097        String end = fieldSpec.substring(pos + 1);
098        if (start.length() == 0) {
099          start = "0";
100        }
101        if (end.length() == 0) {
102          allFieldsFrom = Integer.parseInt(start);
103          continue;
104        }
105        int startPos = Integer.parseInt(start);
106        int endPos = Integer.parseInt(end);
107        for (j = startPos; j <= endPos; j++) {
108          fieldList.add(j);
109        }
110      }
111    }
112    return allFieldsFrom;
113  }
114
115  private static String selectFields(String[] fields, List<Integer> fieldList,
116      int allFieldsFrom, String separator) {
117    String retv = null;
118    int i = 0;
119    StringBuffer sb = null;
120    if (fieldList != null && fieldList.size() > 0) {
121      if (sb == null) {
122        sb = new StringBuffer();
123      }
124      for (Integer index : fieldList) {
125        if (index < fields.length) {
126          sb.append(fields[index]);
127        }
128        sb.append(separator);
129      }
130    }
131    if (allFieldsFrom >= 0) {
132      if (sb == null) {
133        sb = new StringBuffer();
134      }
135      for (i = allFieldsFrom; i < fields.length; i++) {
136        sb.append(fields[i]).append(separator);
137      }
138    }
139    if (sb != null) {
140      retv = sb.toString();
141      if (retv.length() > 0) {
142        retv = retv.substring(0, retv.length() - 1);
143      }
144    }
145    return retv;
146  }
147  
148  public static int parseOutputKeyValueSpec(String keyValueSpec,
149      List<Integer> keyFieldList, List<Integer> valueFieldList) {
150    String[] keyValSpecs = keyValueSpec.split(":", -1);
151    
152    String[] keySpec = keyValSpecs[0].split(",");
153    
154    String[] valSpec = new String[0];
155    if (keyValSpecs.length > 1) {
156      valSpec = keyValSpecs[1].split(",");
157    }
158
159    FieldSelectionHelper.extractFields(keySpec, keyFieldList);
160    return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
161  }
162
163  public static String specToString(String fieldSeparator, String keyValueSpec,
164      int allValueFieldsFrom, List<Integer> keyFieldList,
165      List<Integer> valueFieldList) {
166    StringBuffer sb = new StringBuffer();
167    sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
168
169    sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
170    sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
171    sb.append("\n");
172    sb.append("keyFieldList.length: ").append(keyFieldList.size());
173    sb.append("\n");
174    for (Integer field : keyFieldList) {
175      sb.append("\t").append(field).append("\n");
176    }
177    sb.append("valueFieldList.length: ").append(valueFieldList.size());
178    sb.append("\n");
179    for (Integer field : valueFieldList) {
180      sb.append("\t").append(field).append("\n");
181    }
182    return sb.toString();
183  }
184
185  private Text key = null;
186  private Text value = null;
187  
188  public FieldSelectionHelper() {
189  }
190
191  public FieldSelectionHelper(Text key, Text val) {
192    this.key = key;
193    this.value = val;
194  }
195  
196  public Text getKey() {
197    return key;
198  }
199 
200  public Text getValue() {
201    return value;
202  }
203
204  public void extractOutputKeyValue(String key, String val,
205      String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
206      int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
207    if (!ignoreKey) {
208      val = key + val;
209    }
210    String[] fields = val.split(fieldSep);
211    
212    String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
213    String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
214      fieldSep);
215    if (isMap && newKey == null) {
216      newKey = newVal;
217      newVal = null;
218    }
219    
220    if (newKey != null) {
221      this.key = new Text(newKey);
222    }
223    if (newVal != null) {
224      this.value = new Text(newVal);
225    }
226  }
227}