001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.lib.partition;
020
021 import java.util.List;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configurable;
026 import org.apache.hadoop.conf.Configuration;
027 import org.apache.hadoop.io.WritableComparator;
028 import org.apache.hadoop.io.WritableUtils;
029 import org.apache.hadoop.io.Text;
030 import org.apache.hadoop.mapreduce.Job;
031 import org.apache.hadoop.mapreduce.JobContext;
032 import org.apache.hadoop.mapreduce.MRJobConfig;
033 import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
034
035
036 /**
037 * This comparator implementation provides a subset of the features provided
038 * by the Unix/GNU Sort. In particular, the supported features are:
039 * -n, (Sort numerically)
040 * -r, (Reverse the result of comparison)
041 * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
042 * of the field to use, and c is the number of the first character from the
043 * beginning of the field. Fields and character posns are numbered starting
044 * with 1; a character position of zero in pos2 indicates the field's last
045 * character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
046 * of the field); if omitted from pos2, it defaults to 0 (the end of the
047 * field). opts are ordering options (any of 'nr' as described above).
048 * We assume that the fields in the key are separated by
049 * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
050 */
051 @InterfaceAudience.Public
052 @InterfaceStability.Stable
053 public class KeyFieldBasedComparator<K, V> extends WritableComparator
054 implements Configurable {
055 private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
056 public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options";
057 private static final byte NEGATIVE = (byte)'-';
058 private static final byte ZERO = (byte)'0';
059 private static final byte DECIMAL = (byte)'.';
060 private Configuration conf;
061
062 public void setConf(Configuration conf) {
063 this.conf = conf;
064 String option = conf.get(COMPARATOR_OPTIONS);
065 String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t");
066 keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
067 keyFieldHelper.parseOption(option);
068 }
069
070 public Configuration getConf() {
071 return conf;
072 }
073
074 public KeyFieldBasedComparator() {
075 super(Text.class);
076 }
077
078 public int compare(byte[] b1, int s1, int l1,
079 byte[] b2, int s2, int l2) {
080 int n1 = WritableUtils.decodeVIntSize(b1[s1]);
081 int n2 = WritableUtils.decodeVIntSize(b2[s2]);
082 List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
083
084 if (allKeySpecs.size() == 0) {
085 return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
086 }
087
088 int []lengthIndicesFirst =
089 keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1);
090 int []lengthIndicesSecond =
091 keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2);
092
093 for (KeyDescription keySpec : allKeySpecs) {
094 int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1,
095 lengthIndicesFirst, keySpec);
096 int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1,
097 lengthIndicesFirst, keySpec);
098 int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2,
099 lengthIndicesSecond, keySpec);
100 int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2,
101 lengthIndicesSecond, keySpec);
102 int result;
103 if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2,
104 startCharSecond, endCharSecond, keySpec)) != 0) {
105 return result;
106 }
107 }
108 return 0;
109 }
110
111 private int compareByteSequence(byte[] first, int start1, int end1,
112 byte[] second, int start2, int end2, KeyDescription key) {
113 if (start1 == -1) {
114 if (key.reverse) {
115 return 1;
116 }
117 return -1;
118 }
119 if (start2 == -1) {
120 if (key.reverse) {
121 return -1;
122 }
123 return 1;
124 }
125 int compareResult = 0;
126 if (!key.numeric) {
127 compareResult = compareBytes(first, start1, end1-start1 + 1, second,
128 start2, end2 - start2 + 1);
129 }
130 if (key.numeric) {
131 compareResult = numericalCompare (first, start1, end1, second, start2,
132 end2);
133 }
134 if (key.reverse) {
135 return -compareResult;
136 }
137 return compareResult;
138 }
139
140 private int numericalCompare (byte[] a, int start1, int end1,
141 byte[] b, int start2, int end2) {
142 int i = start1;
143 int j = start2;
144 int mul = 1;
145 byte first_a = a[i];
146 byte first_b = b[j];
147 if (first_a == NEGATIVE) {
148 if (first_b != NEGATIVE) {
149 //check for cases like -0.0 and 0.0 (they should be declared equal)
150 return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2);
151 }
152 i++;
153 }
154 if (first_b == NEGATIVE) {
155 if (first_a != NEGATIVE) {
156 //check for cases like 0.0 and -0.0 (they should be declared equal)
157 return -oneNegativeCompare(b, start2+1, end2, a, start1, end1);
158 }
159 j++;
160 }
161 if (first_b == NEGATIVE && first_a == NEGATIVE) {
162 mul = -1;
163 }
164
165 //skip over ZEROs
166 while (i <= end1) {
167 if (a[i] != ZERO) {
168 break;
169 }
170 i++;
171 }
172 while (j <= end2) {
173 if (b[j] != ZERO) {
174 break;
175 }
176 j++;
177 }
178
179 //skip over equal characters and stopping at the first nondigit char
180 //The nondigit character could be '.'
181 while (i <= end1 && j <= end2) {
182 if (!isdigit(a[i]) || a[i] != b[j]) {
183 break;
184 }
185 i++; j++;
186 }
187 if (i <= end1) {
188 first_a = a[i];
189 }
190 if (j <= end2) {
191 first_b = b[j];
192 }
193 //store the result of the difference. This could be final result if the
194 //number of digits in the mantissa is the same in both the numbers
195 int firstResult = first_a - first_b;
196
197 //check whether we hit a decimal in the earlier scan
198 if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
199 (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
200 return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) :
201 decimalCompare(a, i, end1, b, j, end2));
202 }
203 //check the number of digits in the mantissa of the numbers
204 int numRemainDigits_a = 0;
205 int numRemainDigits_b = 0;
206 while (i <= end1) {
207 //if we encounter a non-digit treat the corresponding number as being
208 //smaller
209 if (isdigit(a[i++])) {
210 numRemainDigits_a++;
211 } else break;
212 }
213 while (j <= end2) {
214 //if we encounter a non-digit treat the corresponding number as being
215 //smaller
216 if (isdigit(b[j++])) {
217 numRemainDigits_b++;
218 } else break;
219 }
220 int ret = numRemainDigits_a - numRemainDigits_b;
221 if (ret == 0) {
222 return ((mul < 0) ? -firstResult : firstResult);
223 } else {
224 return ((mul < 0) ? -ret : ret);
225 }
226 }
227 private boolean isdigit(byte b) {
228 if ('0' <= b && b <= '9') {
229 return true;
230 }
231 return false;
232 }
233 private int decimalCompare(byte[] a, int i, int end1,
234 byte[] b, int j, int end2) {
235 if (i > end1) {
236 //if a[] has nothing remaining
237 return -decimalCompare1(b, ++j, end2);
238 }
239 if (j > end2) {
240 //if b[] has nothing remaining
241 return decimalCompare1(a, ++i, end1);
242 }
243 if (a[i] == DECIMAL && b[j] == DECIMAL) {
244 while (i <= end1 && j <= end2) {
245 if (a[i] != b[j]) {
246 if (isdigit(a[i]) && isdigit(b[j])) {
247 return a[i] - b[j];
248 }
249 if (isdigit(a[i])) {
250 return 1;
251 }
252 if (isdigit(b[j])) {
253 return -1;
254 }
255 return 0;
256 }
257 i++; j++;
258 }
259 if (i > end1 && j > end2) {
260 return 0;
261 }
262
263 if (i > end1) {
264 //check whether there is a non-ZERO digit after potentially
265 //a number of ZEROs (e.g., a=.4444, b=.444400004)
266 return -decimalCompare1(b, j, end2);
267 }
268 if (j > end2) {
269 //check whether there is a non-ZERO digit after potentially
270 //a number of ZEROs (e.g., b=.4444, a=.444400004)
271 return decimalCompare1(a, i, end1);
272 }
273 }
274 else if (a[i] == DECIMAL) {
275 return decimalCompare1(a, ++i, end1);
276 }
277 else if (b[j] == DECIMAL) {
278 return -decimalCompare1(b, ++j, end2);
279 }
280 return 0;
281 }
282
283 private int decimalCompare1(byte[] a, int i, int end) {
284 while (i <= end) {
285 if (a[i] == ZERO) {
286 i++;
287 continue;
288 }
289 if (isdigit(a[i])) {
290 return 1;
291 } else {
292 return 0;
293 }
294 }
295 return 0;
296 }
297
298 private int oneNegativeCompare(byte[] a, int start1, int end1,
299 byte[] b, int start2, int end2) {
300 //here a[] is negative and b[] is positive
301 //We have to ascertain whether the number contains any digits.
302 //If it does, then it is a smaller number for sure. If not,
303 //then we need to scan b[] to find out whether b[] has a digit
304 //If b[] does contain a digit, then b[] is certainly
305 //greater. If not, that is, both a[] and b[] don't contain
306 //digits then they should be considered equal.
307 if (!isZero(a, start1, end1)) {
308 return -1;
309 }
310 //reached here - this means that a[] is a ZERO
311 if (!isZero(b, start2, end2)) {
312 return -1;
313 }
314 //reached here - both numbers are basically ZEROs and hence
315 //they should compare equal
316 return 0;
317 }
318
319 private boolean isZero(byte a[], int start, int end) {
320 //check for zeros in the significand part as well as the decimal part
321 //note that we treat the non-digit characters as ZERO
322 int i = start;
323 //we check the significand for being a ZERO
324 while (i <= end) {
325 if (a[i] != ZERO) {
326 if (a[i] != DECIMAL && isdigit(a[i])) {
327 return false;
328 }
329 break;
330 }
331 i++;
332 }
333
334 if (i != (end+1) && a[i++] == DECIMAL) {
335 //we check the decimal part for being a ZERO
336 while (i <= end) {
337 if (a[i] != ZERO) {
338 if (isdigit(a[i])) {
339 return false;
340 }
341 break;
342 }
343 i++;
344 }
345 }
346 return true;
347 }
348 /**
349 * Set the {@link KeyFieldBasedComparator} options used to compare keys.
350 *
351 * @param keySpec the key specification of the form -k pos1[,pos2], where,
352 * pos is of the form f[.c][opts], where f is the number
353 * of the key field to use, and c is the number of the first character from
354 * the beginning of the field. Fields and character posns are numbered
355 * starting with 1; a character position of zero in pos2 indicates the
356 * field's last character. If '.c' is omitted from pos1, it defaults to 1
357 * (the beginning of the field); if omitted from pos2, it defaults to 0
358 * (the end of the field). opts are ordering options. The supported options
359 * are:
360 * -n, (Sort numerically)
361 * -r, (Reverse the result of comparison)
362 */
363 public static void setKeyFieldComparatorOptions(Job job, String keySpec) {
364 job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec);
365 }
366
367 /**
368 * Get the {@link KeyFieldBasedComparator} options
369 */
370 public static String getKeyFieldComparatorOption(JobContext job) {
371 return job.getConfiguration().get(COMPARATOR_OPTIONS);
372 }
373
374
375 }