001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.DataInput;
022 import java.io.IOException;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.util.ReflectionUtils;
028
029 /** A Comparator for {@link WritableComparable}s.
030 *
031 * <p>This base implemenation uses the natural ordering. To define alternate
032 * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
033 *
034 * <p>One may optimize compare-intensive operations by overriding
035 * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are
036 * provided to assist in optimized implementations of this method.
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class WritableComparator implements RawComparator {
041
042 private static final ConcurrentHashMap<Class, WritableComparator> comparators
043 = new ConcurrentHashMap<Class, WritableComparator>(); // registry
044
045 /** Get a comparator for a {@link WritableComparable} implementation. */
046 public static WritableComparator get(Class<? extends WritableComparable> c) {
047 WritableComparator comparator = comparators.get(c);
048 if (comparator == null) {
049 // force the static initializers to run
050 forceInit(c);
051 // look to see if it is defined now
052 comparator = comparators.get(c);
053 // if not, use the generic one
054 if (comparator == null) {
055 comparator = new WritableComparator(c, true);
056 }
057 }
058 return comparator;
059 }
060
061 /**
062 * Force initialization of the static members.
063 * As of Java 5, referencing a class doesn't force it to initialize. Since
064 * this class requires that the classes be initialized to declare their
065 * comparators, we force that initialization to happen.
066 * @param cls the class to initialize
067 */
068 private static void forceInit(Class<?> cls) {
069 try {
070 Class.forName(cls.getName(), true, cls.getClassLoader());
071 } catch (ClassNotFoundException e) {
072 throw new IllegalArgumentException("Can't initialize class " + cls, e);
073 }
074 }
075
076 /** Register an optimized comparator for a {@link WritableComparable}
077 * implementation. Comparators registered with this method must be
078 * thread-safe. */
079 public static void define(Class c, WritableComparator comparator) {
080 comparators.put(c, comparator);
081 }
082
083 private final Class<? extends WritableComparable> keyClass;
084 private final WritableComparable key1;
085 private final WritableComparable key2;
086 private final DataInputBuffer buffer;
087
088 protected WritableComparator() {
089 this(null);
090 }
091
092 /** Construct for a {@link WritableComparable} implementation. */
093 protected WritableComparator(Class<? extends WritableComparable> keyClass) {
094 this(keyClass, false);
095 }
096
097 protected WritableComparator(Class<? extends WritableComparable> keyClass,
098 boolean createInstances) {
099 this.keyClass = keyClass;
100 if (createInstances) {
101 key1 = newKey();
102 key2 = newKey();
103 buffer = new DataInputBuffer();
104 } else {
105 key1 = key2 = null;
106 buffer = null;
107 }
108 }
109
110 /** Returns the WritableComparable implementation class. */
111 public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
112
113 /** Construct a new {@link WritableComparable} instance. */
114 public WritableComparable newKey() {
115 return ReflectionUtils.newInstance(keyClass, null);
116 }
117
118 /** Optimization hook. Override this to make SequenceFile.Sorter's scream.
119 *
120 * <p>The default implementation reads the data into two {@link
121 * WritableComparable}s (using {@link
122 * Writable#readFields(DataInput)}, then calls {@link
123 * #compare(WritableComparable,WritableComparable)}.
124 */
125 @Override
126 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
127 try {
128 buffer.reset(b1, s1, l1); // parse key1
129 key1.readFields(buffer);
130
131 buffer.reset(b2, s2, l2); // parse key2
132 key2.readFields(buffer);
133
134 } catch (IOException e) {
135 throw new RuntimeException(e);
136 }
137
138 return compare(key1, key2); // compare them
139 }
140
141 /** Compare two WritableComparables.
142 *
143 * <p> The default implementation uses the natural ordering, calling {@link
144 * Comparable#compareTo(Object)}. */
145 @SuppressWarnings("unchecked")
146 public int compare(WritableComparable a, WritableComparable b) {
147 return a.compareTo(b);
148 }
149
150 @Override
151 public int compare(Object a, Object b) {
152 return compare((WritableComparable)a, (WritableComparable)b);
153 }
154
155 /** Lexicographic order of binary data. */
156 public static int compareBytes(byte[] b1, int s1, int l1,
157 byte[] b2, int s2, int l2) {
158 return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
159 }
160
161 /** Compute hash for binary data. */
162 public static int hashBytes(byte[] bytes, int offset, int length) {
163 int hash = 1;
164 for (int i = offset; i < offset + length; i++)
165 hash = (31 * hash) + (int)bytes[i];
166 return hash;
167 }
168
169 /** Compute hash for binary data. */
170 public static int hashBytes(byte[] bytes, int length) {
171 return hashBytes(bytes, 0, length);
172 }
173
174 /** Parse an unsigned short from a byte array. */
175 public static int readUnsignedShort(byte[] bytes, int start) {
176 return (((bytes[start] & 0xff) << 8) +
177 ((bytes[start+1] & 0xff)));
178 }
179
180 /** Parse an integer from a byte array. */
181 public static int readInt(byte[] bytes, int start) {
182 return (((bytes[start ] & 0xff) << 24) +
183 ((bytes[start+1] & 0xff) << 16) +
184 ((bytes[start+2] & 0xff) << 8) +
185 ((bytes[start+3] & 0xff)));
186
187 }
188
189 /** Parse a float from a byte array. */
190 public static float readFloat(byte[] bytes, int start) {
191 return Float.intBitsToFloat(readInt(bytes, start));
192 }
193
194 /** Parse a long from a byte array. */
195 public static long readLong(byte[] bytes, int start) {
196 return ((long)(readInt(bytes, start)) << 32) +
197 (readInt(bytes, start+4) & 0xFFFFFFFFL);
198 }
199
200 /** Parse a double from a byte array. */
201 public static double readDouble(byte[] bytes, int start) {
202 return Double.longBitsToDouble(readLong(bytes, start));
203 }
204
205 /**
206 * Reads a zero-compressed encoded long from a byte array and returns it.
207 * @param bytes byte array with decode long
208 * @param start starting index
209 * @throws java.io.IOException
210 * @return deserialized long
211 */
212 public static long readVLong(byte[] bytes, int start) throws IOException {
213 int len = bytes[start];
214 if (len >= -112) {
215 return len;
216 }
217 boolean isNegative = (len < -120);
218 len = isNegative ? -(len + 120) : -(len + 112);
219 if (start+1+len>bytes.length)
220 throw new IOException(
221 "Not enough number of bytes for a zero-compressed integer");
222 long i = 0;
223 for (int idx = 0; idx < len; idx++) {
224 i = i << 8;
225 i = i | (bytes[start+1+idx] & 0xFF);
226 }
227 return (isNegative ? (i ^ -1L) : i);
228 }
229
230 /**
231 * Reads a zero-compressed encoded integer from a byte array and returns it.
232 * @param bytes byte array with the encoded integer
233 * @param start start index
234 * @throws java.io.IOException
235 * @return deserialized integer
236 */
237 public static int readVInt(byte[] bytes, int start) throws IOException {
238 return (int) readVLong(bytes, start);
239 }
240 }