001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.fieldsel;
020
021 import java.util.List;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.io.Text;
026
027 /**
028 * This class implements a mapper/reducer class that can be used to perform
029 * field selections in a manner similar to unix cut. The input data is treated
030 * as fields separated by a user specified separator (the default value is
031 * "\t"). The user can specify a list of fields that form the map output keys,
032 * and a list of fields that form the map output values. If the inputformat is
033 * TextInputFormat, the mapper will ignore the key to the map function. and the
034 * fields are from the value only. Otherwise, the fields are the union of those
035 * from the key and those from the value.
036 *
037 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator"
038 *
039 * The map output field list spec is under attribute
040 * "mapreduce.fieldsel.map.output.key.value.fields.spec".
041 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
042 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
043 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
044 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all
045 * the fields starting from field 3. The open range field spec applies value fields only.
046 * They have no effect on the key fields.
047 *
048 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
049 * and use fields 6,5,1,2,3,7 and above for values.
050 *
051 * The reduce output field list spec is under attribute
052 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec".
053 *
054 * The reducer extracts output key/value pairs in a similar manner, except that
055 * the key is never ignored.
056 *
057 */
058 @InterfaceAudience.Public
059 @InterfaceStability.Stable
060 public class FieldSelectionHelper {
061
062 public static Text emptyText = new Text("");
063 public static final String DATA_FIELD_SEPERATOR =
064 "mapreduce.fieldsel.data.field.separator";
065 public static final String MAP_OUTPUT_KEY_VALUE_SPEC =
066 "mapreduce.fieldsel.map.output.key.value.fields.spec";
067 public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC =
068 "mapreduce.fieldsel.reduce.output.key.value.fields.spec";
069
070
071 /**
072 * Extract the actual field numbers from the given field specs.
073 * If a field spec is in the form of "n-" (like 3-), then n will be the
074 * return value. Otherwise, -1 will be returned.
075 * @param fieldListSpec an array of field specs
076 * @param fieldList an array of field numbers extracted from the specs.
077 * @return number n if some field spec is in the form of "n-", -1 otherwise.
078 */
079 private static int extractFields(String[] fieldListSpec,
080 List<Integer> fieldList) {
081 int allFieldsFrom = -1;
082 int i = 0;
083 int j = 0;
084 int pos = -1;
085 String fieldSpec = null;
086 for (i = 0; i < fieldListSpec.length; i++) {
087 fieldSpec = fieldListSpec[i];
088 if (fieldSpec.length() == 0) {
089 continue;
090 }
091 pos = fieldSpec.indexOf('-');
092 if (pos < 0) {
093 Integer fn = new Integer(fieldSpec);
094 fieldList.add(fn);
095 } else {
096 String start = fieldSpec.substring(0, pos);
097 String end = fieldSpec.substring(pos + 1);
098 if (start.length() == 0) {
099 start = "0";
100 }
101 if (end.length() == 0) {
102 allFieldsFrom = Integer.parseInt(start);
103 continue;
104 }
105 int startPos = Integer.parseInt(start);
106 int endPos = Integer.parseInt(end);
107 for (j = startPos; j <= endPos; j++) {
108 fieldList.add(j);
109 }
110 }
111 }
112 return allFieldsFrom;
113 }
114
115 private static String selectFields(String[] fields, List<Integer> fieldList,
116 int allFieldsFrom, String separator) {
117 String retv = null;
118 int i = 0;
119 StringBuffer sb = null;
120 if (fieldList != null && fieldList.size() > 0) {
121 if (sb == null) {
122 sb = new StringBuffer();
123 }
124 for (Integer index : fieldList) {
125 if (index < fields.length) {
126 sb.append(fields[index]);
127 }
128 sb.append(separator);
129 }
130 }
131 if (allFieldsFrom >= 0) {
132 if (sb == null) {
133 sb = new StringBuffer();
134 }
135 for (i = allFieldsFrom; i < fields.length; i++) {
136 sb.append(fields[i]).append(separator);
137 }
138 }
139 if (sb != null) {
140 retv = sb.toString();
141 if (retv.length() > 0) {
142 retv = retv.substring(0, retv.length() - 1);
143 }
144 }
145 return retv;
146 }
147
148 public static int parseOutputKeyValueSpec(String keyValueSpec,
149 List<Integer> keyFieldList, List<Integer> valueFieldList) {
150 String[] keyValSpecs = keyValueSpec.split(":", -1);
151
152 String[] keySpec = keyValSpecs[0].split(",");
153
154 String[] valSpec = new String[0];
155 if (keyValSpecs.length > 1) {
156 valSpec = keyValSpecs[1].split(",");
157 }
158
159 FieldSelectionHelper.extractFields(keySpec, keyFieldList);
160 return FieldSelectionHelper.extractFields(valSpec, valueFieldList);
161 }
162
163 public static String specToString(String fieldSeparator, String keyValueSpec,
164 int allValueFieldsFrom, List<Integer> keyFieldList,
165 List<Integer> valueFieldList) {
166 StringBuffer sb = new StringBuffer();
167 sb.append("fieldSeparator: ").append(fieldSeparator).append("\n");
168
169 sb.append("keyValueSpec: ").append(keyValueSpec).append("\n");
170 sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom);
171 sb.append("\n");
172 sb.append("keyFieldList.length: ").append(keyFieldList.size());
173 sb.append("\n");
174 for (Integer field : keyFieldList) {
175 sb.append("\t").append(field).append("\n");
176 }
177 sb.append("valueFieldList.length: ").append(valueFieldList.size());
178 sb.append("\n");
179 for (Integer field : valueFieldList) {
180 sb.append("\t").append(field).append("\n");
181 }
182 return sb.toString();
183 }
184
185 private Text key = null;
186 private Text value = null;
187
188 public FieldSelectionHelper() {
189 }
190
191 public FieldSelectionHelper(Text key, Text val) {
192 this.key = key;
193 this.value = val;
194 }
195
196 public Text getKey() {
197 return key;
198 }
199
200 public Text getValue() {
201 return value;
202 }
203
204 public void extractOutputKeyValue(String key, String val,
205 String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList,
206 int allValueFieldsFrom, boolean ignoreKey, boolean isMap) {
207 if (!ignoreKey) {
208 val = key + val;
209 }
210 String[] fields = val.split(fieldSep);
211
212 String newKey = selectFields(fields, keyFieldList, -1, fieldSep);
213 String newVal = selectFields(fields, valFieldList, allValueFieldsFrom,
214 fieldSep);
215 if (isMap && newKey == null) {
216 newKey = newVal;
217 newVal = null;
218 }
219
220 if (newKey != null) {
221 this.key = new Text(newKey);
222 }
223 if (newVal != null) {
224 this.value = new Text(newVal);
225 }
226 }
227 }