001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.fieldsel; 020 021 import java.util.List; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.io.Text; 026 027 /** 028 * This class implements a mapper/reducer class that can be used to perform 029 * field selections in a manner similar to unix cut. The input data is treated 030 * as fields separated by a user specified separator (the default value is 031 * "\t"). The user can specify a list of fields that form the map output keys, 032 * and a list of fields that form the map output values. If the inputformat is 033 * TextInputFormat, the mapper will ignore the key to the map function. and the 034 * fields are from the value only. Otherwise, the fields are the union of those 035 * from the key and those from the value. 036 * 037 * The field separator is under attribute "mapreduce.fieldsel.data.field.separator" 038 * 039 * The map output field list spec is under attribute 040 * "mapreduce.fieldsel.map.output.key.value.fields.spec". 041 * The value is expected to be like "keyFieldsSpec:valueFieldsSpec" 042 * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... 043 * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range 044 * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 045 * the fields starting from field 3. The open range field spec applies value fields only. 046 * They have no effect on the key fields. 047 * 048 * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, 049 * and use fields 6,5,1,2,3,7 and above for values. 050 * 051 * The reduce output field list spec is under attribute 052 * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". 053 * 054 * The reducer extracts output key/value pairs in a similar manner, except that 055 * the key is never ignored. 056 * 057 */ 058 @InterfaceAudience.Public 059 @InterfaceStability.Stable 060 public class FieldSelectionHelper { 061 062 public static Text emptyText = new Text(""); 063 public static final String DATA_FIELD_SEPERATOR = 064 "mapreduce.fieldsel.data.field.separator"; 065 public static final String MAP_OUTPUT_KEY_VALUE_SPEC = 066 "mapreduce.fieldsel.map.output.key.value.fields.spec"; 067 public static final String REDUCE_OUTPUT_KEY_VALUE_SPEC = 068 "mapreduce.fieldsel.reduce.output.key.value.fields.spec"; 069 070 071 /** 072 * Extract the actual field numbers from the given field specs. 073 * If a field spec is in the form of "n-" (like 3-), then n will be the 074 * return value. Otherwise, -1 will be returned. 075 * @param fieldListSpec an array of field specs 076 * @param fieldList an array of field numbers extracted from the specs. 077 * @return number n if some field spec is in the form of "n-", -1 otherwise. 078 */ 079 private static int extractFields(String[] fieldListSpec, 080 List<Integer> fieldList) { 081 int allFieldsFrom = -1; 082 int i = 0; 083 int j = 0; 084 int pos = -1; 085 String fieldSpec = null; 086 for (i = 0; i < fieldListSpec.length; i++) { 087 fieldSpec = fieldListSpec[i]; 088 if (fieldSpec.length() == 0) { 089 continue; 090 } 091 pos = fieldSpec.indexOf('-'); 092 if (pos < 0) { 093 Integer fn = new Integer(fieldSpec); 094 fieldList.add(fn); 095 } else { 096 String start = fieldSpec.substring(0, pos); 097 String end = fieldSpec.substring(pos + 1); 098 if (start.length() == 0) { 099 start = "0"; 100 } 101 if (end.length() == 0) { 102 allFieldsFrom = Integer.parseInt(start); 103 continue; 104 } 105 int startPos = Integer.parseInt(start); 106 int endPos = Integer.parseInt(end); 107 for (j = startPos; j <= endPos; j++) { 108 fieldList.add(j); 109 } 110 } 111 } 112 return allFieldsFrom; 113 } 114 115 private static String selectFields(String[] fields, List<Integer> fieldList, 116 int allFieldsFrom, String separator) { 117 String retv = null; 118 int i = 0; 119 StringBuffer sb = null; 120 if (fieldList != null && fieldList.size() > 0) { 121 if (sb == null) { 122 sb = new StringBuffer(); 123 } 124 for (Integer index : fieldList) { 125 if (index < fields.length) { 126 sb.append(fields[index]); 127 } 128 sb.append(separator); 129 } 130 } 131 if (allFieldsFrom >= 0) { 132 if (sb == null) { 133 sb = new StringBuffer(); 134 } 135 for (i = allFieldsFrom; i < fields.length; i++) { 136 sb.append(fields[i]).append(separator); 137 } 138 } 139 if (sb != null) { 140 retv = sb.toString(); 141 if (retv.length() > 0) { 142 retv = retv.substring(0, retv.length() - 1); 143 } 144 } 145 return retv; 146 } 147 148 public static int parseOutputKeyValueSpec(String keyValueSpec, 149 List<Integer> keyFieldList, List<Integer> valueFieldList) { 150 String[] keyValSpecs = keyValueSpec.split(":", -1); 151 152 String[] keySpec = keyValSpecs[0].split(","); 153 154 String[] valSpec = new String[0]; 155 if (keyValSpecs.length > 1) { 156 valSpec = keyValSpecs[1].split(","); 157 } 158 159 FieldSelectionHelper.extractFields(keySpec, keyFieldList); 160 return FieldSelectionHelper.extractFields(valSpec, valueFieldList); 161 } 162 163 public static String specToString(String fieldSeparator, String keyValueSpec, 164 int allValueFieldsFrom, List<Integer> keyFieldList, 165 List<Integer> valueFieldList) { 166 StringBuffer sb = new StringBuffer(); 167 sb.append("fieldSeparator: ").append(fieldSeparator).append("\n"); 168 169 sb.append("keyValueSpec: ").append(keyValueSpec).append("\n"); 170 sb.append("allValueFieldsFrom: ").append(allValueFieldsFrom); 171 sb.append("\n"); 172 sb.append("keyFieldList.length: ").append(keyFieldList.size()); 173 sb.append("\n"); 174 for (Integer field : keyFieldList) { 175 sb.append("\t").append(field).append("\n"); 176 } 177 sb.append("valueFieldList.length: ").append(valueFieldList.size()); 178 sb.append("\n"); 179 for (Integer field : valueFieldList) { 180 sb.append("\t").append(field).append("\n"); 181 } 182 return sb.toString(); 183 } 184 185 private Text key = null; 186 private Text value = null; 187 188 public FieldSelectionHelper() { 189 } 190 191 public FieldSelectionHelper(Text key, Text val) { 192 this.key = key; 193 this.value = val; 194 } 195 196 public Text getKey() { 197 return key; 198 } 199 200 public Text getValue() { 201 return value; 202 } 203 204 public void extractOutputKeyValue(String key, String val, 205 String fieldSep, List<Integer> keyFieldList, List<Integer> valFieldList, 206 int allValueFieldsFrom, boolean ignoreKey, boolean isMap) { 207 if (!ignoreKey) { 208 val = key + val; 209 } 210 String[] fields = val.split(fieldSep); 211 212 String newKey = selectFields(fields, keyFieldList, -1, fieldSep); 213 String newVal = selectFields(fields, valFieldList, allValueFieldsFrom, 214 fieldSep); 215 if (isMap && newKey == null) { 216 newKey = newVal; 217 newVal = null; 218 } 219 220 if (newKey != null) { 221 this.key = new Text(newKey); 222 } 223 if (newVal != null) { 224 this.value = new Text(newVal); 225 } 226 } 227 }