001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.DataInput; 022import java.io.IOException; 023import java.util.concurrent.ConcurrentHashMap; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.util.ReflectionUtils; 030 031/** A Comparator for {@link WritableComparable}s. 032 * 033 * <p>This base implemenation uses the natural ordering. To define alternate 034 * orderings, override {@link #compare(WritableComparable,WritableComparable)}. 035 * 036 * <p>One may optimize compare-intensive operations by overriding 037 * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are 038 * provided to assist in optimized implementations of this method. 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class WritableComparator implements RawComparator, Configurable { 043 044 private static final ConcurrentHashMap<Class, WritableComparator> comparators 045 = new ConcurrentHashMap<Class, WritableComparator>(); // registry 046 047 private Configuration conf; 048 049 /** For backwards compatibility. **/ 050 public static WritableComparator get(Class<? extends WritableComparable> c) { 051 return get(c, null); 052 } 053 054 /** Get a comparator for a {@link WritableComparable} implementation. */ 055 public static WritableComparator get( 056 Class<? extends WritableComparable> c, Configuration conf) { 057 WritableComparator comparator = comparators.get(c); 058 if (comparator == null) { 059 // force the static initializers to run 060 forceInit(c); 061 // look to see if it is defined now 062 comparator = comparators.get(c); 063 // if not, use the generic one 064 if (comparator == null) { 065 comparator = new WritableComparator(c, conf, true); 066 } 067 } 068 // Newly passed Configuration objects should be used. 069 ReflectionUtils.setConf(comparator, conf); 070 return comparator; 071 } 072 073 @Override 074 public void setConf(Configuration conf) { 075 this.conf = conf; 076 } 077 078 @Override 079 public Configuration getConf() { 080 return conf; 081 } 082 083 /** 084 * Force initialization of the static members. 085 * As of Java 5, referencing a class doesn't force it to initialize. Since 086 * this class requires that the classes be initialized to declare their 087 * comparators, we force that initialization to happen. 088 * @param cls the class to initialize 089 */ 090 private static void forceInit(Class<?> cls) { 091 try { 092 Class.forName(cls.getName(), true, cls.getClassLoader()); 093 } catch (ClassNotFoundException e) { 094 throw new IllegalArgumentException("Can't initialize class " + cls, e); 095 } 096 } 097 098 /** Register an optimized comparator for a {@link WritableComparable} 099 * implementation. Comparators registered with this method must be 100 * thread-safe. */ 101 public static void define(Class c, WritableComparator comparator) { 102 comparators.put(c, comparator); 103 } 104 105 private final Class<? extends WritableComparable> keyClass; 106 private final WritableComparable key1; 107 private final WritableComparable key2; 108 private final DataInputBuffer buffer; 109 110 protected WritableComparator() { 111 this(null); 112 } 113 114 /** Construct for a {@link WritableComparable} implementation. */ 115 protected WritableComparator(Class<? extends WritableComparable> keyClass) { 116 this(keyClass, null, false); 117 } 118 119 protected WritableComparator(Class<? extends WritableComparable> keyClass, 120 boolean createInstances) { 121 this(keyClass, null, createInstances); 122 } 123 124 protected WritableComparator(Class<? extends WritableComparable> keyClass, 125 Configuration conf, 126 boolean createInstances) { 127 this.keyClass = keyClass; 128 this.conf = (conf != null) ? conf : new Configuration(); 129 if (createInstances) { 130 key1 = newKey(); 131 key2 = newKey(); 132 buffer = new DataInputBuffer(); 133 } else { 134 key1 = key2 = null; 135 buffer = null; 136 } 137 } 138 139 /** Returns the WritableComparable implementation class. */ 140 public Class<? extends WritableComparable> getKeyClass() { return keyClass; } 141 142 /** Construct a new {@link WritableComparable} instance. */ 143 public WritableComparable newKey() { 144 return ReflectionUtils.newInstance(keyClass, conf); 145 } 146 147 /** Optimization hook. Override this to make SequenceFile.Sorter's scream. 148 * 149 * <p>The default implementation reads the data into two {@link 150 * WritableComparable}s (using {@link 151 * Writable#readFields(DataInput)}, then calls {@link 152 * #compare(WritableComparable,WritableComparable)}. 153 */ 154 @Override 155 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 156 try { 157 buffer.reset(b1, s1, l1); // parse key1 158 key1.readFields(buffer); 159 160 buffer.reset(b2, s2, l2); // parse key2 161 key2.readFields(buffer); 162 163 buffer.reset(null, 0, 0); // clean up reference 164 } catch (IOException e) { 165 throw new RuntimeException(e); 166 } 167 168 return compare(key1, key2); // compare them 169 } 170 171 /** Compare two WritableComparables. 172 * 173 * <p> The default implementation uses the natural ordering, calling {@link 174 * Comparable#compareTo(Object)}. */ 175 @SuppressWarnings("unchecked") 176 public int compare(WritableComparable a, WritableComparable b) { 177 return a.compareTo(b); 178 } 179 180 @Override 181 public int compare(Object a, Object b) { 182 return compare((WritableComparable)a, (WritableComparable)b); 183 } 184 185 /** Lexicographic order of binary data. */ 186 public static int compareBytes(byte[] b1, int s1, int l1, 187 byte[] b2, int s2, int l2) { 188 return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); 189 } 190 191 /** Compute hash for binary data. */ 192 public static int hashBytes(byte[] bytes, int offset, int length) { 193 int hash = 1; 194 for (int i = offset; i < offset + length; i++) 195 hash = (31 * hash) + (int)bytes[i]; 196 return hash; 197 } 198 199 /** Compute hash for binary data. */ 200 public static int hashBytes(byte[] bytes, int length) { 201 return hashBytes(bytes, 0, length); 202 } 203 204 /** Parse an unsigned short from a byte array. */ 205 public static int readUnsignedShort(byte[] bytes, int start) { 206 return (((bytes[start] & 0xff) << 8) + 207 ((bytes[start+1] & 0xff))); 208 } 209 210 /** Parse an integer from a byte array. */ 211 public static int readInt(byte[] bytes, int start) { 212 return (((bytes[start ] & 0xff) << 24) + 213 ((bytes[start+1] & 0xff) << 16) + 214 ((bytes[start+2] & 0xff) << 8) + 215 ((bytes[start+3] & 0xff))); 216 217 } 218 219 /** Parse a float from a byte array. */ 220 public static float readFloat(byte[] bytes, int start) { 221 return Float.intBitsToFloat(readInt(bytes, start)); 222 } 223 224 /** Parse a long from a byte array. */ 225 public static long readLong(byte[] bytes, int start) { 226 return ((long)(readInt(bytes, start)) << 32) + 227 (readInt(bytes, start+4) & 0xFFFFFFFFL); 228 } 229 230 /** Parse a double from a byte array. */ 231 public static double readDouble(byte[] bytes, int start) { 232 return Double.longBitsToDouble(readLong(bytes, start)); 233 } 234 235 /** 236 * Reads a zero-compressed encoded long from a byte array and returns it. 237 * @param bytes byte array with decode long 238 * @param start starting index 239 * @throws java.io.IOException 240 * @return deserialized long 241 */ 242 public static long readVLong(byte[] bytes, int start) throws IOException { 243 int len = bytes[start]; 244 if (len >= -112) { 245 return len; 246 } 247 boolean isNegative = (len < -120); 248 len = isNegative ? -(len + 120) : -(len + 112); 249 if (start+1+len>bytes.length) 250 throw new IOException( 251 "Not enough number of bytes for a zero-compressed integer"); 252 long i = 0; 253 for (int idx = 0; idx < len; idx++) { 254 i = i << 8; 255 i = i | (bytes[start+1+idx] & 0xFF); 256 } 257 return (isNegative ? (i ^ -1L) : i); 258 } 259 260 /** 261 * Reads a zero-compressed encoded integer from a byte array and returns it. 262 * @param bytes byte array with the encoded integer 263 * @param start start index 264 * @throws java.io.IOException 265 * @return deserialized integer 266 */ 267 public static int readVInt(byte[] bytes, int start) throws IOException { 268 return (int) readVLong(bytes, start); 269 } 270}