001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.lib.partition;
020
021import java.util.List;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.WritableComparator;
028import org.apache.hadoop.io.WritableUtils;
029import org.apache.hadoop.io.Text;
030import org.apache.hadoop.mapreduce.Job;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.MRJobConfig;
033import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
034
035
036/**
037 * This comparator implementation provides a subset of the features provided
038 * by the Unix/GNU Sort. In particular, the supported features are:
039 * -n, (Sort numerically)
040 * -r, (Reverse the result of comparison)
041 * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
042 *  of the field to use, and c is the number of the first character from the
043 *  beginning of the field. Fields and character posns are numbered starting
044 *  with 1; a character position of zero in pos2 indicates the field's last
045 *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
046 *  of the field); if omitted from pos2, it defaults to 0 (the end of the
047 *  field). opts are ordering options (any of 'nr' as described above). 
048 * We assume that the fields in the key are separated by 
049 * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
050 */
051@InterfaceAudience.Public
052@InterfaceStability.Stable
053public class KeyFieldBasedComparator<K, V> extends WritableComparator 
054    implements Configurable {
055  private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
056  public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options";
057  private static final byte NEGATIVE = (byte)'-';
058  private static final byte ZERO = (byte)'0';
059  private static final byte DECIMAL = (byte)'.';
060  private Configuration conf;
061
062  public void setConf(Configuration conf) {
063    this.conf = conf;
064    String option = conf.get(COMPARATOR_OPTIONS);
065    String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t");
066    keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
067    keyFieldHelper.parseOption(option);
068  }
069
070  public Configuration getConf() {
071    return conf;
072  }
073  
074  public KeyFieldBasedComparator() {
075    super(Text.class);
076  }
077    
078  public int compare(byte[] b1, int s1, int l1,
079      byte[] b2, int s2, int l2) {
080    int n1 = WritableUtils.decodeVIntSize(b1[s1]);
081    int n2 = WritableUtils.decodeVIntSize(b2[s2]);
082    List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
083
084    if (allKeySpecs.size() == 0) {
085      return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
086    }
087    
088    int []lengthIndicesFirst = 
089      keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1);
090    int []lengthIndicesSecond = 
091      keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2);
092    
093    for (KeyDescription keySpec : allKeySpecs) {
094      int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1,
095        lengthIndicesFirst, keySpec);
096      int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1, 
097        lengthIndicesFirst, keySpec);
098      int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2,
099        lengthIndicesSecond, keySpec);
100      int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2, 
101        lengthIndicesSecond, keySpec);
102      int result;
103      if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
104          startCharSecond, endCharSecond, keySpec)) != 0) {
105        return result;
106      }
107    }
108    return 0;
109  }
110  
111  private int compareByteSequence(byte[] first, int start1, int end1, 
112      byte[] second, int start2, int end2, KeyDescription key) {
113    if (start1 == -1) {
114      if (key.reverse) {
115        return 1;
116      }
117      return -1;
118    }
119    if (start2 == -1) {
120      if (key.reverse) {
121        return -1; 
122      }
123      return 1;
124    }
125    int compareResult = 0;
126    if (!key.numeric) {
127      compareResult = compareBytes(first, start1, end1-start1 + 1, second,
128        start2, end2 - start2 + 1);
129    }
130    if (key.numeric) {
131      compareResult = numericalCompare (first, start1, end1, second, start2,
132        end2);
133    }
134    if (key.reverse) {
135      return -compareResult;
136    }
137    return compareResult;
138  }
139  
140  private int numericalCompare (byte[] a, int start1, int end1, 
141      byte[] b, int start2, int end2) {
142    int i = start1;
143    int j = start2;
144    int mul = 1;
145    byte first_a = a[i];
146    byte first_b = b[j];
147    if (first_a == NEGATIVE) {
148      if (first_b != NEGATIVE) {
149        //check for cases like -0.0 and 0.0 (they should be declared equal)
150        return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2);
151      }
152      i++;
153    }
154    if (first_b == NEGATIVE) {
155      if (first_a != NEGATIVE) {
156        //check for cases like 0.0 and -0.0 (they should be declared equal)
157        return -oneNegativeCompare(b, start2+1, end2, a, start1, end1);
158      }
159      j++;
160    }
161    if (first_b == NEGATIVE && first_a == NEGATIVE) {
162      mul = -1;
163    }
164
165    //skip over ZEROs
166    while (i <= end1) {
167      if (a[i] != ZERO) {
168        break;
169      }
170      i++;
171    }
172    while (j <= end2) {
173      if (b[j] != ZERO) {
174        break;
175      }
176      j++;
177    }
178    
179    //skip over equal characters and stopping at the first nondigit char
180    //The nondigit character could be '.'
181    while (i <= end1 && j <= end2) {
182      if (!isdigit(a[i]) || a[i] != b[j]) {
183        break;
184      }
185      i++; j++;
186    }
187    if (i <= end1) {
188      first_a = a[i];
189    }
190    if (j <= end2) {
191      first_b = b[j];
192    }
193    //store the result of the difference. This could be final result if the
194    //number of digits in the mantissa is the same in both the numbers 
195    int firstResult = first_a - first_b;
196    
197    //check whether we hit a decimal in the earlier scan
198    if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
199            (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
200      return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) : 
201        decimalCompare(a, i, end1, b, j, end2));
202    }
203    //check the number of digits in the mantissa of the numbers
204    int numRemainDigits_a = 0;
205    int numRemainDigits_b = 0;
206    while (i <= end1) {
207      //if we encounter a non-digit treat the corresponding number as being 
208      //smaller      
209      if (isdigit(a[i++])) {
210        numRemainDigits_a++;
211      } else break;
212    }
213    while (j <= end2) {
214      //if we encounter a non-digit treat the corresponding number as being 
215      //smaller
216      if (isdigit(b[j++])) {
217        numRemainDigits_b++;
218      } else break;
219    }
220    int ret = numRemainDigits_a - numRemainDigits_b;
221    if (ret == 0) { 
222      return ((mul < 0) ? -firstResult : firstResult);
223    } else {
224      return ((mul < 0) ? -ret : ret);
225    }
226  }
227  private boolean isdigit(byte b) {
228    if ('0' <= b && b <= '9') {
229      return true;
230    }
231    return false;
232  }
233  private int decimalCompare(byte[] a, int i, int end1, 
234                             byte[] b, int j, int end2) {
235    if (i > end1) {
236      //if a[] has nothing remaining
237      return -decimalCompare1(b, ++j, end2);
238    }
239    if (j > end2) {
240      //if b[] has nothing remaining
241      return decimalCompare1(a, ++i, end1);
242    }
243    if (a[i] == DECIMAL && b[j] == DECIMAL) {
244      while (i <= end1 && j <= end2) {
245        if (a[i] != b[j]) {
246          if (isdigit(a[i]) && isdigit(b[j])) {
247            return a[i] - b[j];
248          }
249          if (isdigit(a[i])) {
250            return 1;
251          }
252          if (isdigit(b[j])) {
253            return -1;
254          }
255          return 0;
256        }
257        i++; j++;
258      }
259      if (i > end1 && j > end2) {
260        return 0;
261      }
262        
263      if (i > end1) {
264        //check whether there is a non-ZERO digit after potentially
265        //a number of ZEROs (e.g., a=.4444, b=.444400004)
266        return -decimalCompare1(b, j, end2);
267      }
268      if (j > end2) {
269        //check whether there is a non-ZERO digit after potentially
270        //a number of ZEROs (e.g., b=.4444, a=.444400004)
271        return decimalCompare1(a, i, end1);
272      }
273    }
274    else if (a[i] == DECIMAL) {
275      return decimalCompare1(a, ++i, end1);
276    }
277    else if (b[j] == DECIMAL) {
278      return -decimalCompare1(b, ++j, end2);
279    }
280    return 0;
281  }
282  
283  private int decimalCompare1(byte[] a, int i, int end) {
284    while (i <= end) {
285      if (a[i] == ZERO) {
286        i++;
287        continue;
288      }
289      if (isdigit(a[i])) {
290        return 1;
291      } else {
292        return 0;
293      }
294    }
295    return 0;
296  }
297  
298  private int oneNegativeCompare(byte[] a, int start1, int end1, 
299      byte[] b, int start2, int end2) {
300    //here a[] is negative and b[] is positive
301    //We have to ascertain whether the number contains any digits.
302    //If it does, then it is a smaller number for sure. If not,
303    //then we need to scan b[] to find out whether b[] has a digit
304    //If b[] does contain a digit, then b[] is certainly
305    //greater. If not, that is, both a[] and b[] don't contain
306    //digits then they should be considered equal.
307    if (!isZero(a, start1, end1)) {
308      return -1;
309    }
310    //reached here - this means that a[] is a ZERO
311    if (!isZero(b, start2, end2)) {
312      return -1;
313    }
314    //reached here - both numbers are basically ZEROs and hence
315    //they should compare equal
316    return 0;
317  }
318  
319  private boolean isZero(byte a[], int start, int end) {
320    //check for zeros in the significand part as well as the decimal part
321    //note that we treat the non-digit characters as ZERO
322    int i = start;
323    //we check the significand for being a ZERO
324    while (i <= end) {
325      if (a[i] != ZERO) {
326        if (a[i] != DECIMAL && isdigit(a[i])) {
327          return false;
328        }
329        break;
330      }
331      i++;
332    }
333
334    if (i != (end+1) && a[i++] == DECIMAL) {
335      //we check the decimal part for being a ZERO
336      while (i <= end) {
337        if (a[i] != ZERO) {
338          if (isdigit(a[i])) {
339            return false;
340          }
341          break;
342        }
343        i++;
344      }
345    }
346    return true;
347  }
348  /**
349   * Set the {@link KeyFieldBasedComparator} options used to compare keys.
350   * 
351   * @param keySpec the key specification of the form -k pos1[,pos2], where,
352   *  pos is of the form f[.c][opts], where f is the number
353   *  of the key field to use, and c is the number of the first character from
354   *  the beginning of the field. Fields and character posns are numbered 
355   *  starting with 1; a character position of zero in pos2 indicates the
356   *  field's last character. If '.c' is omitted from pos1, it defaults to 1
357   *  (the beginning of the field); if omitted from pos2, it defaults to 0 
358   *  (the end of the field). opts are ordering options. The supported options
359   *  are:
360   *    -n, (Sort numerically)
361   *    -r, (Reverse the result of comparison)                 
362   */
363  public static void setKeyFieldComparatorOptions(Job job, String keySpec) {
364    job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec);
365  }
366  
367  /**
368   * Get the {@link KeyFieldBasedComparator} options
369   */
370  public static String getKeyFieldComparatorOption(JobContext job) {
371    return job.getConfiguration().get(COMPARATOR_OPTIONS);
372  }
373
374
375}