001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.DataInput; 022import java.io.IOException; 023import java.util.concurrent.ConcurrentHashMap; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.util.ReflectionUtils; 030 031/** A Comparator for {@link WritableComparable}s. 032 * 033 * <p>This base implemenation uses the natural ordering. To define alternate 034 * orderings, override {@link #compare(WritableComparable,WritableComparable)}. 035 * 036 * <p>One may optimize compare-intensive operations by overriding 037 * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are 038 * provided to assist in optimized implementations of this method. 039 */ 040@InterfaceAudience.Public 041@InterfaceStability.Stable 042public class WritableComparator implements RawComparator, Configurable { 043 044 private static final ConcurrentHashMap<Class, WritableComparator> comparators 045 = new ConcurrentHashMap<Class, WritableComparator>(); // registry 046 047 private Configuration conf; 048 049 /** For backwards compatibility. **/ 050 public static WritableComparator get(Class<? extends WritableComparable> c) { 051 return get(c, null); 052 } 053 054 /** Get a comparator for a {@link WritableComparable} implementation. */ 055 public static WritableComparator get( 056 Class<? extends WritableComparable> c, Configuration conf) { 057 WritableComparator comparator = comparators.get(c); 058 if (comparator == null) { 059 // force the static initializers to run 060 forceInit(c); 061 // look to see if it is defined now 062 comparator = comparators.get(c); 063 // if not, use the generic one 064 if (comparator == null) { 065 comparator = new WritableComparator(c, conf, true); 066 } 067 } 068 // Newly passed Configuration objects should be used. 069 ReflectionUtils.setConf(comparator, conf); 070 return comparator; 071 } 072 073 @Override 074 public void setConf(Configuration conf) { 075 this.conf = conf; 076 } 077 078 @Override 079 public Configuration getConf() { 080 return conf; 081 } 082 083 /** 084 * Force initialization of the static members. 085 * As of Java 5, referencing a class doesn't force it to initialize. Since 086 * this class requires that the classes be initialized to declare their 087 * comparators, we force that initialization to happen. 088 * @param cls the class to initialize 089 */ 090 private static void forceInit(Class<?> cls) { 091 try { 092 Class.forName(cls.getName(), true, cls.getClassLoader()); 093 } catch (ClassNotFoundException e) { 094 throw new IllegalArgumentException("Can't initialize class " + cls, e); 095 } 096 } 097 098 /** Register an optimized comparator for a {@link WritableComparable} 099 * implementation. Comparators registered with this method must be 100 * thread-safe. */ 101 public static void define(Class c, WritableComparator comparator) { 102 comparators.put(c, comparator); 103 } 104 105 private final Class<? extends WritableComparable> keyClass; 106 private final WritableComparable key1; 107 private final WritableComparable key2; 108 private final DataInputBuffer buffer; 109 110 protected WritableComparator() { 111 this(null); 112 } 113 114 /** Construct for a {@link WritableComparable} implementation. */ 115 protected WritableComparator(Class<? extends WritableComparable> keyClass) { 116 this(keyClass, null, false); 117 } 118 119 protected WritableComparator(Class<? extends WritableComparable> keyClass, 120 boolean createInstances) { 121 this(keyClass, null, createInstances); 122 } 123 124 protected WritableComparator(Class<? extends WritableComparable> keyClass, 125 Configuration conf, 126 boolean createInstances) { 127 this.keyClass = keyClass; 128 this.conf = (conf != null) ? conf : new Configuration(); 129 if (createInstances) { 130 key1 = newKey(); 131 key2 = newKey(); 132 buffer = new DataInputBuffer(); 133 } else { 134 key1 = key2 = null; 135 buffer = null; 136 } 137 } 138 139 /** Returns the WritableComparable implementation class. */ 140 public Class<? extends WritableComparable> getKeyClass() { return keyClass; } 141 142 /** Construct a new {@link WritableComparable} instance. */ 143 public WritableComparable newKey() { 144 return ReflectionUtils.newInstance(keyClass, conf); 145 } 146 147 /** Optimization hook. Override this to make SequenceFile.Sorter's scream. 148 * 149 * <p>The default implementation reads the data into two {@link 150 * WritableComparable}s (using {@link 151 * Writable#readFields(DataInput)}, then calls {@link 152 * #compare(WritableComparable,WritableComparable)}. 153 */ 154 @Override 155 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 156 try { 157 buffer.reset(b1, s1, l1); // parse key1 158 key1.readFields(buffer); 159 160 buffer.reset(b2, s2, l2); // parse key2 161 key2.readFields(buffer); 162 163 } catch (IOException e) { 164 throw new RuntimeException(e); 165 } 166 167 return compare(key1, key2); // compare them 168 } 169 170 /** Compare two WritableComparables. 171 * 172 * <p> The default implementation uses the natural ordering, calling {@link 173 * Comparable#compareTo(Object)}. */ 174 @SuppressWarnings("unchecked") 175 public int compare(WritableComparable a, WritableComparable b) { 176 return a.compareTo(b); 177 } 178 179 @Override 180 public int compare(Object a, Object b) { 181 return compare((WritableComparable)a, (WritableComparable)b); 182 } 183 184 /** Lexicographic order of binary data. */ 185 public static int compareBytes(byte[] b1, int s1, int l1, 186 byte[] b2, int s2, int l2) { 187 return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); 188 } 189 190 /** Compute hash for binary data. */ 191 public static int hashBytes(byte[] bytes, int offset, int length) { 192 int hash = 1; 193 for (int i = offset; i < offset + length; i++) 194 hash = (31 * hash) + (int)bytes[i]; 195 return hash; 196 } 197 198 /** Compute hash for binary data. */ 199 public static int hashBytes(byte[] bytes, int length) { 200 return hashBytes(bytes, 0, length); 201 } 202 203 /** Parse an unsigned short from a byte array. */ 204 public static int readUnsignedShort(byte[] bytes, int start) { 205 return (((bytes[start] & 0xff) << 8) + 206 ((bytes[start+1] & 0xff))); 207 } 208 209 /** Parse an integer from a byte array. */ 210 public static int readInt(byte[] bytes, int start) { 211 return (((bytes[start ] & 0xff) << 24) + 212 ((bytes[start+1] & 0xff) << 16) + 213 ((bytes[start+2] & 0xff) << 8) + 214 ((bytes[start+3] & 0xff))); 215 216 } 217 218 /** Parse a float from a byte array. */ 219 public static float readFloat(byte[] bytes, int start) { 220 return Float.intBitsToFloat(readInt(bytes, start)); 221 } 222 223 /** Parse a long from a byte array. */ 224 public static long readLong(byte[] bytes, int start) { 225 return ((long)(readInt(bytes, start)) << 32) + 226 (readInt(bytes, start+4) & 0xFFFFFFFFL); 227 } 228 229 /** Parse a double from a byte array. */ 230 public static double readDouble(byte[] bytes, int start) { 231 return Double.longBitsToDouble(readLong(bytes, start)); 232 } 233 234 /** 235 * Reads a zero-compressed encoded long from a byte array and returns it. 236 * @param bytes byte array with decode long 237 * @param start starting index 238 * @throws java.io.IOException 239 * @return deserialized long 240 */ 241 public static long readVLong(byte[] bytes, int start) throws IOException { 242 int len = bytes[start]; 243 if (len >= -112) { 244 return len; 245 } 246 boolean isNegative = (len < -120); 247 len = isNegative ? -(len + 120) : -(len + 112); 248 if (start+1+len>bytes.length) 249 throw new IOException( 250 "Not enough number of bytes for a zero-compressed integer"); 251 long i = 0; 252 for (int idx = 0; idx < len; idx++) { 253 i = i << 8; 254 i = i | (bytes[start+1+idx] & 0xFF); 255 } 256 return (isNegative ? (i ^ -1L) : i); 257 } 258 259 /** 260 * Reads a zero-compressed encoded integer from a byte array and returns it. 261 * @param bytes byte array with the encoded integer 262 * @param start start index 263 * @throws java.io.IOException 264 * @return deserialized integer 265 */ 266 public static int readVInt(byte[] bytes, int start) throws IOException { 267 return (int) readVLong(bytes, start); 268 } 269}