001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.DataInput;
022 import java.io.IOException;
023 import java.util.concurrent.ConcurrentHashMap;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.conf.Configurable;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.util.ReflectionUtils;
030
031 /** A Comparator for {@link WritableComparable}s.
032 *
033 * <p>This base implemenation uses the natural ordering. To define alternate
034 * orderings, override {@link #compare(WritableComparable,WritableComparable)}.
035 *
036 * <p>One may optimize compare-intensive operations by overriding
037 * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are
038 * provided to assist in optimized implementations of this method.
039 */
040 @InterfaceAudience.Public
041 @InterfaceStability.Stable
042 public class WritableComparator implements RawComparator, Configurable {
043
044 private static final ConcurrentHashMap<Class, WritableComparator> comparators
045 = new ConcurrentHashMap<Class, WritableComparator>(); // registry
046
047 private Configuration conf;
048
049 /** For backwards compatibility. **/
050 public static WritableComparator get(Class<? extends WritableComparable> c) {
051 return get(c, null);
052 }
053
054 /** Get a comparator for a {@link WritableComparable} implementation. */
055 public static WritableComparator get(
056 Class<? extends WritableComparable> c, Configuration conf) {
057 WritableComparator comparator = comparators.get(c);
058 if (comparator == null) {
059 // force the static initializers to run
060 forceInit(c);
061 // look to see if it is defined now
062 comparator = comparators.get(c);
063 // if not, use the generic one
064 if (comparator == null) {
065 comparator = new WritableComparator(c, conf, true);
066 }
067 }
068 // Newly passed Configuration objects should be used.
069 ReflectionUtils.setConf(comparator, conf);
070 return comparator;
071 }
072
073 @Override
074 public void setConf(Configuration conf) {
075 this.conf = conf;
076 }
077
078 @Override
079 public Configuration getConf() {
080 return conf;
081 }
082
083 /**
084 * Force initialization of the static members.
085 * As of Java 5, referencing a class doesn't force it to initialize. Since
086 * this class requires that the classes be initialized to declare their
087 * comparators, we force that initialization to happen.
088 * @param cls the class to initialize
089 */
090 private static void forceInit(Class<?> cls) {
091 try {
092 Class.forName(cls.getName(), true, cls.getClassLoader());
093 } catch (ClassNotFoundException e) {
094 throw new IllegalArgumentException("Can't initialize class " + cls, e);
095 }
096 }
097
098 /** Register an optimized comparator for a {@link WritableComparable}
099 * implementation. Comparators registered with this method must be
100 * thread-safe. */
101 public static void define(Class c, WritableComparator comparator) {
102 comparators.put(c, comparator);
103 }
104
105 private final Class<? extends WritableComparable> keyClass;
106 private final WritableComparable key1;
107 private final WritableComparable key2;
108 private final DataInputBuffer buffer;
109
110 protected WritableComparator() {
111 this(null);
112 }
113
114 /** Construct for a {@link WritableComparable} implementation. */
115 protected WritableComparator(Class<? extends WritableComparable> keyClass) {
116 this(keyClass, null, false);
117 }
118
119 protected WritableComparator(Class<? extends WritableComparable> keyClass,
120 boolean createInstances) {
121 this(keyClass, null, createInstances);
122 }
123
124 protected WritableComparator(Class<? extends WritableComparable> keyClass,
125 Configuration conf,
126 boolean createInstances) {
127 this.keyClass = keyClass;
128 this.conf = (conf != null) ? conf : new Configuration();
129 if (createInstances) {
130 key1 = newKey();
131 key2 = newKey();
132 buffer = new DataInputBuffer();
133 } else {
134 key1 = key2 = null;
135 buffer = null;
136 }
137 }
138
139 /** Returns the WritableComparable implementation class. */
140 public Class<? extends WritableComparable> getKeyClass() { return keyClass; }
141
142 /** Construct a new {@link WritableComparable} instance. */
143 public WritableComparable newKey() {
144 return ReflectionUtils.newInstance(keyClass, conf);
145 }
146
147 /** Optimization hook. Override this to make SequenceFile.Sorter's scream.
148 *
149 * <p>The default implementation reads the data into two {@link
150 * WritableComparable}s (using {@link
151 * Writable#readFields(DataInput)}, then calls {@link
152 * #compare(WritableComparable,WritableComparable)}.
153 */
154 @Override
155 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
156 try {
157 buffer.reset(b1, s1, l1); // parse key1
158 key1.readFields(buffer);
159
160 buffer.reset(b2, s2, l2); // parse key2
161 key2.readFields(buffer);
162
163 } catch (IOException e) {
164 throw new RuntimeException(e);
165 }
166
167 return compare(key1, key2); // compare them
168 }
169
170 /** Compare two WritableComparables.
171 *
172 * <p> The default implementation uses the natural ordering, calling {@link
173 * Comparable#compareTo(Object)}. */
174 @SuppressWarnings("unchecked")
175 public int compare(WritableComparable a, WritableComparable b) {
176 return a.compareTo(b);
177 }
178
179 @Override
180 public int compare(Object a, Object b) {
181 return compare((WritableComparable)a, (WritableComparable)b);
182 }
183
184 /** Lexicographic order of binary data. */
185 public static int compareBytes(byte[] b1, int s1, int l1,
186 byte[] b2, int s2, int l2) {
187 return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
188 }
189
190 /** Compute hash for binary data. */
191 public static int hashBytes(byte[] bytes, int offset, int length) {
192 int hash = 1;
193 for (int i = offset; i < offset + length; i++)
194 hash = (31 * hash) + (int)bytes[i];
195 return hash;
196 }
197
198 /** Compute hash for binary data. */
199 public static int hashBytes(byte[] bytes, int length) {
200 return hashBytes(bytes, 0, length);
201 }
202
203 /** Parse an unsigned short from a byte array. */
204 public static int readUnsignedShort(byte[] bytes, int start) {
205 return (((bytes[start] & 0xff) << 8) +
206 ((bytes[start+1] & 0xff)));
207 }
208
209 /** Parse an integer from a byte array. */
210 public static int readInt(byte[] bytes, int start) {
211 return (((bytes[start ] & 0xff) << 24) +
212 ((bytes[start+1] & 0xff) << 16) +
213 ((bytes[start+2] & 0xff) << 8) +
214 ((bytes[start+3] & 0xff)));
215
216 }
217
218 /** Parse a float from a byte array. */
219 public static float readFloat(byte[] bytes, int start) {
220 return Float.intBitsToFloat(readInt(bytes, start));
221 }
222
223 /** Parse a long from a byte array. */
224 public static long readLong(byte[] bytes, int start) {
225 return ((long)(readInt(bytes, start)) << 32) +
226 (readInt(bytes, start+4) & 0xFFFFFFFFL);
227 }
228
229 /** Parse a double from a byte array. */
230 public static double readDouble(byte[] bytes, int start) {
231 return Double.longBitsToDouble(readLong(bytes, start));
232 }
233
234 /**
235 * Reads a zero-compressed encoded long from a byte array and returns it.
236 * @param bytes byte array with decode long
237 * @param start starting index
238 * @throws java.io.IOException
239 * @return deserialized long
240 */
241 public static long readVLong(byte[] bytes, int start) throws IOException {
242 int len = bytes[start];
243 if (len >= -112) {
244 return len;
245 }
246 boolean isNegative = (len < -120);
247 len = isNegative ? -(len + 120) : -(len + 112);
248 if (start+1+len>bytes.length)
249 throw new IOException(
250 "Not enough number of bytes for a zero-compressed integer");
251 long i = 0;
252 for (int idx = 0; idx < len; idx++) {
253 i = i << 8;
254 i = i | (bytes[start+1+idx] & 0xFF);
255 }
256 return (isNegative ? (i ^ -1L) : i);
257 }
258
259 /**
260 * Reads a zero-compressed encoded integer from a byte array and returns it.
261 * @param bytes byte array with the encoded integer
262 * @param start start index
263 * @throws java.io.IOException
264 * @return deserialized integer
265 */
266 public static int readVInt(byte[] bytes, int start) throws IOException {
267 return (int) readVLong(bytes, start);
268 }
269 }