001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.mapreduce.lib.partition; 020 021 import java.util.List; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.conf.Configurable; 026 import org.apache.hadoop.conf.Configuration; 027 import org.apache.hadoop.io.WritableComparator; 028 import org.apache.hadoop.io.WritableUtils; 029 import org.apache.hadoop.io.Text; 030 import org.apache.hadoop.mapreduce.Job; 031 import org.apache.hadoop.mapreduce.JobContext; 032 import org.apache.hadoop.mapreduce.MRJobConfig; 033 import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription; 034 035 036 /** 037 * This comparator implementation provides a subset of the features provided 038 * by the Unix/GNU Sort. In particular, the supported features are: 039 * -n, (Sort numerically) 040 * -r, (Reverse the result of comparison) 041 * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number 042 * of the field to use, and c is the number of the first character from the 043 * beginning of the field. Fields and character posns are numbered starting 044 * with 1; a character position of zero in pos2 indicates the field's last 045 * character. If '.c' is omitted from pos1, it defaults to 1 (the beginning 046 * of the field); if omitted from pos2, it defaults to 0 (the end of the 047 * field). opts are ordering options (any of 'nr' as described above). 048 * We assume that the fields in the key are separated by 049 * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}. 050 */ 051 @InterfaceAudience.Public 052 @InterfaceStability.Stable 053 public class KeyFieldBasedComparator<K, V> extends WritableComparator 054 implements Configurable { 055 private KeyFieldHelper keyFieldHelper = new KeyFieldHelper(); 056 public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options"; 057 private static final byte NEGATIVE = (byte)'-'; 058 private static final byte ZERO = (byte)'0'; 059 private static final byte DECIMAL = (byte)'.'; 060 private Configuration conf; 061 062 public void setConf(Configuration conf) { 063 this.conf = conf; 064 String option = conf.get(COMPARATOR_OPTIONS); 065 String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t"); 066 keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator); 067 keyFieldHelper.parseOption(option); 068 } 069 070 public Configuration getConf() { 071 return conf; 072 } 073 074 public KeyFieldBasedComparator() { 075 super(Text.class); 076 } 077 078 public int compare(byte[] b1, int s1, int l1, 079 byte[] b2, int s2, int l2) { 080 int n1 = WritableUtils.decodeVIntSize(b1[s1]); 081 int n2 = WritableUtils.decodeVIntSize(b2[s2]); 082 List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs(); 083 084 if (allKeySpecs.size() == 0) { 085 return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2); 086 } 087 088 int []lengthIndicesFirst = 089 keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1); 090 int []lengthIndicesSecond = 091 keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2); 092 093 for (KeyDescription keySpec : allKeySpecs) { 094 int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1, 095 lengthIndicesFirst, keySpec); 096 int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1, 097 lengthIndicesFirst, keySpec); 098 int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2, 099 lengthIndicesSecond, keySpec); 100 int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2, 101 lengthIndicesSecond, keySpec); 102 int result; 103 if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 104 startCharSecond, endCharSecond, keySpec)) != 0) { 105 return result; 106 } 107 } 108 return 0; 109 } 110 111 private int compareByteSequence(byte[] first, int start1, int end1, 112 byte[] second, int start2, int end2, KeyDescription key) { 113 if (start1 == -1) { 114 if (key.reverse) { 115 return 1; 116 } 117 return -1; 118 } 119 if (start2 == -1) { 120 if (key.reverse) { 121 return -1; 122 } 123 return 1; 124 } 125 int compareResult = 0; 126 if (!key.numeric) { 127 compareResult = compareBytes(first, start1, end1-start1 + 1, second, 128 start2, end2 - start2 + 1); 129 } 130 if (key.numeric) { 131 compareResult = numericalCompare (first, start1, end1, second, start2, 132 end2); 133 } 134 if (key.reverse) { 135 return -compareResult; 136 } 137 return compareResult; 138 } 139 140 private int numericalCompare (byte[] a, int start1, int end1, 141 byte[] b, int start2, int end2) { 142 int i = start1; 143 int j = start2; 144 int mul = 1; 145 byte first_a = a[i]; 146 byte first_b = b[j]; 147 if (first_a == NEGATIVE) { 148 if (first_b != NEGATIVE) { 149 //check for cases like -0.0 and 0.0 (they should be declared equal) 150 return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2); 151 } 152 i++; 153 } 154 if (first_b == NEGATIVE) { 155 if (first_a != NEGATIVE) { 156 //check for cases like 0.0 and -0.0 (they should be declared equal) 157 return -oneNegativeCompare(b, start2+1, end2, a, start1, end1); 158 } 159 j++; 160 } 161 if (first_b == NEGATIVE && first_a == NEGATIVE) { 162 mul = -1; 163 } 164 165 //skip over ZEROs 166 while (i <= end1) { 167 if (a[i] != ZERO) { 168 break; 169 } 170 i++; 171 } 172 while (j <= end2) { 173 if (b[j] != ZERO) { 174 break; 175 } 176 j++; 177 } 178 179 //skip over equal characters and stopping at the first nondigit char 180 //The nondigit character could be '.' 181 while (i <= end1 && j <= end2) { 182 if (!isdigit(a[i]) || a[i] != b[j]) { 183 break; 184 } 185 i++; j++; 186 } 187 if (i <= end1) { 188 first_a = a[i]; 189 } 190 if (j <= end2) { 191 first_b = b[j]; 192 } 193 //store the result of the difference. This could be final result if the 194 //number of digits in the mantissa is the same in both the numbers 195 int firstResult = first_a - first_b; 196 197 //check whether we hit a decimal in the earlier scan 198 if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) || 199 (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) { 200 return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) : 201 decimalCompare(a, i, end1, b, j, end2)); 202 } 203 //check the number of digits in the mantissa of the numbers 204 int numRemainDigits_a = 0; 205 int numRemainDigits_b = 0; 206 while (i <= end1) { 207 //if we encounter a non-digit treat the corresponding number as being 208 //smaller 209 if (isdigit(a[i++])) { 210 numRemainDigits_a++; 211 } else break; 212 } 213 while (j <= end2) { 214 //if we encounter a non-digit treat the corresponding number as being 215 //smaller 216 if (isdigit(b[j++])) { 217 numRemainDigits_b++; 218 } else break; 219 } 220 int ret = numRemainDigits_a - numRemainDigits_b; 221 if (ret == 0) { 222 return ((mul < 0) ? -firstResult : firstResult); 223 } else { 224 return ((mul < 0) ? -ret : ret); 225 } 226 } 227 private boolean isdigit(byte b) { 228 if ('0' <= b && b <= '9') { 229 return true; 230 } 231 return false; 232 } 233 private int decimalCompare(byte[] a, int i, int end1, 234 byte[] b, int j, int end2) { 235 if (i > end1) { 236 //if a[] has nothing remaining 237 return -decimalCompare1(b, ++j, end2); 238 } 239 if (j > end2) { 240 //if b[] has nothing remaining 241 return decimalCompare1(a, ++i, end1); 242 } 243 if (a[i] == DECIMAL && b[j] == DECIMAL) { 244 while (i <= end1 && j <= end2) { 245 if (a[i] != b[j]) { 246 if (isdigit(a[i]) && isdigit(b[j])) { 247 return a[i] - b[j]; 248 } 249 if (isdigit(a[i])) { 250 return 1; 251 } 252 if (isdigit(b[j])) { 253 return -1; 254 } 255 return 0; 256 } 257 i++; j++; 258 } 259 if (i > end1 && j > end2) { 260 return 0; 261 } 262 263 if (i > end1) { 264 //check whether there is a non-ZERO digit after potentially 265 //a number of ZEROs (e.g., a=.4444, b=.444400004) 266 return -decimalCompare1(b, j, end2); 267 } 268 if (j > end2) { 269 //check whether there is a non-ZERO digit after potentially 270 //a number of ZEROs (e.g., b=.4444, a=.444400004) 271 return decimalCompare1(a, i, end1); 272 } 273 } 274 else if (a[i] == DECIMAL) { 275 return decimalCompare1(a, ++i, end1); 276 } 277 else if (b[j] == DECIMAL) { 278 return -decimalCompare1(b, ++j, end2); 279 } 280 return 0; 281 } 282 283 private int decimalCompare1(byte[] a, int i, int end) { 284 while (i <= end) { 285 if (a[i] == ZERO) { 286 i++; 287 continue; 288 } 289 if (isdigit(a[i])) { 290 return 1; 291 } else { 292 return 0; 293 } 294 } 295 return 0; 296 } 297 298 private int oneNegativeCompare(byte[] a, int start1, int end1, 299 byte[] b, int start2, int end2) { 300 //here a[] is negative and b[] is positive 301 //We have to ascertain whether the number contains any digits. 302 //If it does, then it is a smaller number for sure. If not, 303 //then we need to scan b[] to find out whether b[] has a digit 304 //If b[] does contain a digit, then b[] is certainly 305 //greater. If not, that is, both a[] and b[] don't contain 306 //digits then they should be considered equal. 307 if (!isZero(a, start1, end1)) { 308 return -1; 309 } 310 //reached here - this means that a[] is a ZERO 311 if (!isZero(b, start2, end2)) { 312 return -1; 313 } 314 //reached here - both numbers are basically ZEROs and hence 315 //they should compare equal 316 return 0; 317 } 318 319 private boolean isZero(byte a[], int start, int end) { 320 //check for zeros in the significand part as well as the decimal part 321 //note that we treat the non-digit characters as ZERO 322 int i = start; 323 //we check the significand for being a ZERO 324 while (i <= end) { 325 if (a[i] != ZERO) { 326 if (a[i] != DECIMAL && isdigit(a[i])) { 327 return false; 328 } 329 break; 330 } 331 i++; 332 } 333 334 if (i != (end+1) && a[i++] == DECIMAL) { 335 //we check the decimal part for being a ZERO 336 while (i <= end) { 337 if (a[i] != ZERO) { 338 if (isdigit(a[i])) { 339 return false; 340 } 341 break; 342 } 343 i++; 344 } 345 } 346 return true; 347 } 348 /** 349 * Set the {@link KeyFieldBasedComparator} options used to compare keys. 350 * 351 * @param keySpec the key specification of the form -k pos1[,pos2], where, 352 * pos is of the form f[.c][opts], where f is the number 353 * of the key field to use, and c is the number of the first character from 354 * the beginning of the field. Fields and character posns are numbered 355 * starting with 1; a character position of zero in pos2 indicates the 356 * field's last character. If '.c' is omitted from pos1, it defaults to 1 357 * (the beginning of the field); if omitted from pos2, it defaults to 0 358 * (the end of the field). opts are ordering options. The supported options 359 * are: 360 * -n, (Sort numerically) 361 * -r, (Reverse the result of comparison) 362 */ 363 public static void setKeyFieldComparatorOptions(Job job, String keySpec) { 364 job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec); 365 } 366 367 /** 368 * Get the {@link KeyFieldBasedComparator} options 369 */ 370 public static String getKeyFieldComparatorOption(JobContext job) { 371 return job.getConfiguration().get(COMPARATOR_OPTIONS); 372 } 373 374 375 }