001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapreduce.lib.partition;
020    
021    import java.util.List;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configurable;
026    import org.apache.hadoop.conf.Configuration;
027    import org.apache.hadoop.io.WritableComparator;
028    import org.apache.hadoop.io.WritableUtils;
029    import org.apache.hadoop.io.Text;
030    import org.apache.hadoop.mapreduce.Job;
031    import org.apache.hadoop.mapreduce.JobContext;
032    import org.apache.hadoop.mapreduce.MRJobConfig;
033    import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
034    
035    
036    /**
037     * This comparator implementation provides a subset of the features provided
038     * by the Unix/GNU Sort. In particular, the supported features are:
039     * -n, (Sort numerically)
040     * -r, (Reverse the result of comparison)
041     * -k pos1[,pos2], where pos is of the form f[.c][opts], where f is the number
042     *  of the field to use, and c is the number of the first character from the
043     *  beginning of the field. Fields and character posns are numbered starting
044     *  with 1; a character position of zero in pos2 indicates the field's last
045     *  character. If '.c' is omitted from pos1, it defaults to 1 (the beginning
046     *  of the field); if omitted from pos2, it defaults to 0 (the end of the
047     *  field). opts are ordering options (any of 'nr' as described above). 
048     * We assume that the fields in the key are separated by 
049     * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
050     */
051    @InterfaceAudience.Public
052    @InterfaceStability.Stable
053    public class KeyFieldBasedComparator<K, V> extends WritableComparator 
054        implements Configurable {
055      private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
056      public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options";
057      private static final byte NEGATIVE = (byte)'-';
058      private static final byte ZERO = (byte)'0';
059      private static final byte DECIMAL = (byte)'.';
060      private Configuration conf;
061    
062      public void setConf(Configuration conf) {
063        this.conf = conf;
064        String option = conf.get(COMPARATOR_OPTIONS);
065        String keyFieldSeparator = conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t");
066        keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
067        keyFieldHelper.parseOption(option);
068      }
069    
070      public Configuration getConf() {
071        return conf;
072      }
073      
074      public KeyFieldBasedComparator() {
075        super(Text.class);
076      }
077        
078      public int compare(byte[] b1, int s1, int l1,
079          byte[] b2, int s2, int l2) {
080        int n1 = WritableUtils.decodeVIntSize(b1[s1]);
081        int n2 = WritableUtils.decodeVIntSize(b2[s2]);
082        List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
083    
084        if (allKeySpecs.size() == 0) {
085          return compareBytes(b1, s1 + n1, l1 - n1, b2, s2 + n2, l2 - n2);
086        }
087        
088        int []lengthIndicesFirst = 
089          keyFieldHelper.getWordLengths(b1, s1 + n1, s1 + l1);
090        int []lengthIndicesSecond = 
091          keyFieldHelper.getWordLengths(b2, s2 + n2, s2 + l2);
092        
093        for (KeyDescription keySpec : allKeySpecs) {
094          int startCharFirst = keyFieldHelper.getStartOffset(b1, s1 + n1, s1 + l1,
095            lengthIndicesFirst, keySpec);
096          int endCharFirst = keyFieldHelper.getEndOffset(b1, s1 + n1, s1 + l1, 
097            lengthIndicesFirst, keySpec);
098          int startCharSecond = keyFieldHelper.getStartOffset(b2, s2 + n2, s2 + l2,
099            lengthIndicesSecond, keySpec);
100          int endCharSecond = keyFieldHelper.getEndOffset(b2, s2 + n2, s2 + l2, 
101            lengthIndicesSecond, keySpec);
102          int result;
103          if ((result = compareByteSequence(b1, startCharFirst, endCharFirst, b2, 
104              startCharSecond, endCharSecond, keySpec)) != 0) {
105            return result;
106          }
107        }
108        return 0;
109      }
110      
111      private int compareByteSequence(byte[] first, int start1, int end1, 
112          byte[] second, int start2, int end2, KeyDescription key) {
113        if (start1 == -1) {
114          if (key.reverse) {
115            return 1;
116          }
117          return -1;
118        }
119        if (start2 == -1) {
120          if (key.reverse) {
121            return -1; 
122          }
123          return 1;
124        }
125        int compareResult = 0;
126        if (!key.numeric) {
127          compareResult = compareBytes(first, start1, end1-start1 + 1, second,
128            start2, end2 - start2 + 1);
129        }
130        if (key.numeric) {
131          compareResult = numericalCompare (first, start1, end1, second, start2,
132            end2);
133        }
134        if (key.reverse) {
135          return -compareResult;
136        }
137        return compareResult;
138      }
139      
140      private int numericalCompare (byte[] a, int start1, int end1, 
141          byte[] b, int start2, int end2) {
142        int i = start1;
143        int j = start2;
144        int mul = 1;
145        byte first_a = a[i];
146        byte first_b = b[j];
147        if (first_a == NEGATIVE) {
148          if (first_b != NEGATIVE) {
149            //check for cases like -0.0 and 0.0 (they should be declared equal)
150            return oneNegativeCompare(a, start1 + 1, end1, b, start2, end2);
151          }
152          i++;
153        }
154        if (first_b == NEGATIVE) {
155          if (first_a != NEGATIVE) {
156            //check for cases like 0.0 and -0.0 (they should be declared equal)
157            return -oneNegativeCompare(b, start2+1, end2, a, start1, end1);
158          }
159          j++;
160        }
161        if (first_b == NEGATIVE && first_a == NEGATIVE) {
162          mul = -1;
163        }
164    
165        //skip over ZEROs
166        while (i <= end1) {
167          if (a[i] != ZERO) {
168            break;
169          }
170          i++;
171        }
172        while (j <= end2) {
173          if (b[j] != ZERO) {
174            break;
175          }
176          j++;
177        }
178        
179        //skip over equal characters and stopping at the first nondigit char
180        //The nondigit character could be '.'
181        while (i <= end1 && j <= end2) {
182          if (!isdigit(a[i]) || a[i] != b[j]) {
183            break;
184          }
185          i++; j++;
186        }
187        if (i <= end1) {
188          first_a = a[i];
189        }
190        if (j <= end2) {
191          first_b = b[j];
192        }
193        //store the result of the difference. This could be final result if the
194        //number of digits in the mantissa is the same in both the numbers 
195        int firstResult = first_a - first_b;
196        
197        //check whether we hit a decimal in the earlier scan
198        if ((first_a == DECIMAL && (!isdigit(first_b) || j > end2)) ||
199                (first_b == DECIMAL && (!isdigit(first_a) || i > end1))) {
200          return ((mul < 0) ? -decimalCompare(a, i, end1, b, j, end2) : 
201            decimalCompare(a, i, end1, b, j, end2));
202        }
203        //check the number of digits in the mantissa of the numbers
204        int numRemainDigits_a = 0;
205        int numRemainDigits_b = 0;
206        while (i <= end1) {
207          //if we encounter a non-digit treat the corresponding number as being 
208          //smaller      
209          if (isdigit(a[i++])) {
210            numRemainDigits_a++;
211          } else break;
212        }
213        while (j <= end2) {
214          //if we encounter a non-digit treat the corresponding number as being 
215          //smaller
216          if (isdigit(b[j++])) {
217            numRemainDigits_b++;
218          } else break;
219        }
220        int ret = numRemainDigits_a - numRemainDigits_b;
221        if (ret == 0) { 
222          return ((mul < 0) ? -firstResult : firstResult);
223        } else {
224          return ((mul < 0) ? -ret : ret);
225        }
226      }
227      private boolean isdigit(byte b) {
228        if ('0' <= b && b <= '9') {
229          return true;
230        }
231        return false;
232      }
233      private int decimalCompare(byte[] a, int i, int end1, 
234                                 byte[] b, int j, int end2) {
235        if (i > end1) {
236          //if a[] has nothing remaining
237          return -decimalCompare1(b, ++j, end2);
238        }
239        if (j > end2) {
240          //if b[] has nothing remaining
241          return decimalCompare1(a, ++i, end1);
242        }
243        if (a[i] == DECIMAL && b[j] == DECIMAL) {
244          while (i <= end1 && j <= end2) {
245            if (a[i] != b[j]) {
246              if (isdigit(a[i]) && isdigit(b[j])) {
247                return a[i] - b[j];
248              }
249              if (isdigit(a[i])) {
250                return 1;
251              }
252              if (isdigit(b[j])) {
253                return -1;
254              }
255              return 0;
256            }
257            i++; j++;
258          }
259          if (i > end1 && j > end2) {
260            return 0;
261          }
262            
263          if (i > end1) {
264            //check whether there is a non-ZERO digit after potentially
265            //a number of ZEROs (e.g., a=.4444, b=.444400004)
266            return -decimalCompare1(b, j, end2);
267          }
268          if (j > end2) {
269            //check whether there is a non-ZERO digit after potentially
270            //a number of ZEROs (e.g., b=.4444, a=.444400004)
271            return decimalCompare1(a, i, end1);
272          }
273        }
274        else if (a[i] == DECIMAL) {
275          return decimalCompare1(a, ++i, end1);
276        }
277        else if (b[j] == DECIMAL) {
278          return -decimalCompare1(b, ++j, end2);
279        }
280        return 0;
281      }
282      
283      private int decimalCompare1(byte[] a, int i, int end) {
284        while (i <= end) {
285          if (a[i] == ZERO) {
286            i++;
287            continue;
288          }
289          if (isdigit(a[i])) {
290            return 1;
291          } else {
292            return 0;
293          }
294        }
295        return 0;
296      }
297      
298      private int oneNegativeCompare(byte[] a, int start1, int end1, 
299          byte[] b, int start2, int end2) {
300        //here a[] is negative and b[] is positive
301        //We have to ascertain whether the number contains any digits.
302        //If it does, then it is a smaller number for sure. If not,
303        //then we need to scan b[] to find out whether b[] has a digit
304        //If b[] does contain a digit, then b[] is certainly
305        //greater. If not, that is, both a[] and b[] don't contain
306        //digits then they should be considered equal.
307        if (!isZero(a, start1, end1)) {
308          return -1;
309        }
310        //reached here - this means that a[] is a ZERO
311        if (!isZero(b, start2, end2)) {
312          return -1;
313        }
314        //reached here - both numbers are basically ZEROs and hence
315        //they should compare equal
316        return 0;
317      }
318      
319      private boolean isZero(byte a[], int start, int end) {
320        //check for zeros in the significand part as well as the decimal part
321        //note that we treat the non-digit characters as ZERO
322        int i = start;
323        //we check the significand for being a ZERO
324        while (i <= end) {
325          if (a[i] != ZERO) {
326            if (a[i] != DECIMAL && isdigit(a[i])) {
327              return false;
328            }
329            break;
330          }
331          i++;
332        }
333    
334        if (i != (end+1) && a[i++] == DECIMAL) {
335          //we check the decimal part for being a ZERO
336          while (i <= end) {
337            if (a[i] != ZERO) {
338              if (isdigit(a[i])) {
339                return false;
340              }
341              break;
342            }
343            i++;
344          }
345        }
346        return true;
347      }
348      /**
349       * Set the {@link KeyFieldBasedComparator} options used to compare keys.
350       * 
351       * @param keySpec the key specification of the form -k pos1[,pos2], where,
352       *  pos is of the form f[.c][opts], where f is the number
353       *  of the key field to use, and c is the number of the first character from
354       *  the beginning of the field. Fields and character posns are numbered 
355       *  starting with 1; a character position of zero in pos2 indicates the
356       *  field's last character. If '.c' is omitted from pos1, it defaults to 1
357       *  (the beginning of the field); if omitted from pos2, it defaults to 0 
358       *  (the end of the field). opts are ordering options. The supported options
359       *  are:
360       *    -n, (Sort numerically)
361       *    -r, (Reverse the result of comparison)                 
362       */
363      public static void setKeyFieldComparatorOptions(Job job, String keySpec) {
364        job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec);
365      }
366      
367      /**
368       * Get the {@link KeyFieldBasedComparator} options
369       */
370      public static String getKeyFieldComparatorOption(JobContext job) {
371        return job.getConfiguration().get(COMPARATOR_OPTIONS);
372      }
373    
374    
375    }