001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.DataInput;
022    import java.io.IOException;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.conf.Configurable;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.util.ReflectionUtils;
030    
031    /** A Comparator for {@link WritableComparable}s.
032     *
033     * <p>This base implemenation uses the natural ordering.  To define alternate
034     * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
035     *
036     * <p>One may optimize compare-intensive operations by overriding
037     * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
038     * provided to assist in optimized implementations of this method.
039     */
040    @InterfaceAudience.Public
041    @InterfaceStability.Stable
042    public class WritableComparator implements RawComparator, Configurable {
043    
044      private static final ConcurrentHashMap<Class, WritableComparator> comparators 
045              = new ConcurrentHashMap<Class, WritableComparator>(); // registry
046    
047      private Configuration conf;
048    
049      /** For backwards compatibility. **/
050      public static WritableComparator get(Class<? extends WritableComparable> c) {
051        return get(c, null);
052      }
053    
054      /** Get a comparator for a {@link WritableComparable} implementation. */
055      public static WritableComparator get(
056          Class<? extends WritableComparable> c, Configuration conf) {
057        WritableComparator comparator = comparators.get(c);
058        if (comparator == null) {
059          // force the static initializers to run
060          forceInit(c);
061          // look to see if it is defined now
062          comparator = comparators.get(c);
063          // if not, use the generic one
064          if (comparator == null) {
065            comparator = new WritableComparator(c, conf, true);
066          }
067        }
068        // Newly passed Configuration objects should be used.
069        ReflectionUtils.setConf(comparator, conf);
070        return comparator;
071      }
072    
073      @Override
074      public void setConf(Configuration conf) {
075        this.conf = conf;
076      }
077    
078      @Override
079      public Configuration getConf() {
080        return conf;
081      }
082    
083      /**
084       * Force initialization of the static members.
085       * As of Java 5, referencing a class doesn't force it to initialize. Since
086       * this class requires that the classes be initialized to declare their
087       * comparators, we force that initialization to happen.
088       * @param cls the class to initialize
089       */
090      private static void forceInit(Class<?> cls) {
091        try {
092          Class.forName(cls.getName(), true, cls.getClassLoader());
093        } catch (ClassNotFoundException e) {
094          throw new IllegalArgumentException("Can't initialize class " + cls, e);
095        }
096      } 
097    
098      /** Register an optimized comparator for a {@link WritableComparable}
099       * implementation. Comparators registered with this method must be
100       * thread-safe. */
101      public static void define(Class c, WritableComparator comparator) {
102        comparators.put(c, comparator);
103      }
104    
105      private final Class<? extends WritableComparable> keyClass;
106      private final WritableComparable key1;
107      private final WritableComparable key2;
108      private final DataInputBuffer buffer;
109    
110      protected WritableComparator() {
111        this(null);
112      }
113    
114      /** Construct for a {@link WritableComparable} implementation. */
115      protected WritableComparator(Class<? extends WritableComparable> keyClass) {
116        this(keyClass, null, false);
117      }
118    
119      protected WritableComparator(Class<? extends WritableComparable> keyClass,
120          boolean createInstances) {
121        this(keyClass, null, createInstances);
122      }
123    
124      protected WritableComparator(Class<? extends WritableComparable> keyClass,
125                                   Configuration conf,
126                                   boolean createInstances) {
127        this.keyClass = keyClass;
128        this.conf = (conf != null) ? conf : new Configuration();
129        if (createInstances) {
130          key1 = newKey();
131          key2 = newKey();
132          buffer = new DataInputBuffer();
133        } else {
134          key1 = key2 = null;
135          buffer = null;
136        }
137      }
138    
139      /** Returns the WritableComparable implementation class. */
140      public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
141    
142      /** Construct a new {@link WritableComparable} instance. */
143      public WritableComparable newKey() {
144        return ReflectionUtils.newInstance(keyClass, conf);
145      }
146    
147      /** Optimization hook.  Override this to make SequenceFile.Sorter's scream.
148       *
149       * <p>The default implementation reads the data into two {@link
150       * WritableComparable}s (using {@link
151       * Writable#readFields(DataInput)}, then calls {@link
152       * #compare(WritableComparable,WritableComparable)}.
153       */
154      @Override
155      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
156        try {
157          buffer.reset(b1, s1, l1);                   // parse key1
158          key1.readFields(buffer);
159          
160          buffer.reset(b2, s2, l2);                   // parse key2
161          key2.readFields(buffer);
162          
163        } catch (IOException e) {
164          throw new RuntimeException(e);
165        }
166        
167        return compare(key1, key2);                   // compare them
168      }
169    
170      /** Compare two WritableComparables.
171       *
172       * <p> The default implementation uses the natural ordering, calling {@link
173       * Comparable#compareTo(Object)}. */
174      @SuppressWarnings("unchecked")
175      public int compare(WritableComparable a, WritableComparable b) {
176        return a.compareTo(b);
177      }
178    
179      @Override
180      public int compare(Object a, Object b) {
181        return compare((WritableComparable)a, (WritableComparable)b);
182      }
183    
184      /** Lexicographic order of binary data. */
185      public static int compareBytes(byte[] b1, int s1, int l1,
186                                     byte[] b2, int s2, int l2) {
187        return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
188      }
189    
190      /** Compute hash for binary data. */
191      public static int hashBytes(byte[] bytes, int offset, int length) {
192        int hash = 1;
193        for (int i = offset; i < offset + length; i++)
194          hash = (31 * hash) + (int)bytes[i];
195        return hash;
196      }
197      
198      /** Compute hash for binary data. */
199      public static int hashBytes(byte[] bytes, int length) {
200        return hashBytes(bytes, 0, length);
201      }
202    
203      /** Parse an unsigned short from a byte array. */
204      public static int readUnsignedShort(byte[] bytes, int start) {
205        return (((bytes[start]   & 0xff) <<  8) +
206                ((bytes[start+1] & 0xff)));
207      }
208    
209      /** Parse an integer from a byte array. */
210      public static int readInt(byte[] bytes, int start) {
211        return (((bytes[start  ] & 0xff) << 24) +
212                ((bytes[start+1] & 0xff) << 16) +
213                ((bytes[start+2] & 0xff) <<  8) +
214                ((bytes[start+3] & 0xff)));
215    
216      }
217    
218      /** Parse a float from a byte array. */
219      public static float readFloat(byte[] bytes, int start) {
220        return Float.intBitsToFloat(readInt(bytes, start));
221      }
222    
223      /** Parse a long from a byte array. */
224      public static long readLong(byte[] bytes, int start) {
225        return ((long)(readInt(bytes, start)) << 32) +
226          (readInt(bytes, start+4) & 0xFFFFFFFFL);
227      }
228    
229      /** Parse a double from a byte array. */
230      public static double readDouble(byte[] bytes, int start) {
231        return Double.longBitsToDouble(readLong(bytes, start));
232      }
233    
234      /**
235       * Reads a zero-compressed encoded long from a byte array and returns it.
236       * @param bytes byte array with decode long
237       * @param start starting index
238       * @throws java.io.IOException 
239       * @return deserialized long
240       */
241      public static long readVLong(byte[] bytes, int start) throws IOException {
242        int len = bytes[start];
243        if (len >= -112) {
244          return len;
245        }
246        boolean isNegative = (len < -120);
247        len = isNegative ? -(len + 120) : -(len + 112);
248        if (start+1+len>bytes.length)
249          throw new IOException(
250                                "Not enough number of bytes for a zero-compressed integer");
251        long i = 0;
252        for (int idx = 0; idx < len; idx++) {
253          i = i << 8;
254          i = i | (bytes[start+1+idx] & 0xFF);
255        }
256        return (isNegative ? (i ^ -1L) : i);
257      }
258      
259      /**
260       * Reads a zero-compressed encoded integer from a byte array and returns it.
261       * @param bytes byte array with the encoded integer
262       * @param start start index
263       * @throws java.io.IOException 
264       * @return deserialized integer
265       */
266      public static int readVInt(byte[] bytes, int start) throws IOException {
267        return (int) readVLong(bytes, start);
268      }
269    }