001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.DataInput; 022 import java.io.IOException; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 import org.apache.hadoop.util.ReflectionUtils; 028 029 /** A Comparator for {@link WritableComparable}s. 030 * 031 * <p>This base implemenation uses the natural ordering. To define alternate 032 * orderings, override {@link #compare(WritableComparable,WritableComparable)}. 033 * 034 * <p>One may optimize compare-intensive operations by overriding 035 * {@link #compare(byte[],int,int,byte[],int,int)}. Static utility methods are 036 * provided to assist in optimized implementations of this method. 037 */ 038 @InterfaceAudience.Public 039 @InterfaceStability.Stable 040 public class WritableComparator implements RawComparator { 041 042 private static final ConcurrentHashMap<Class, WritableComparator> comparators 043 = new ConcurrentHashMap<Class, WritableComparator>(); // registry 044 045 /** Get a comparator for a {@link WritableComparable} implementation. */ 046 public static WritableComparator get(Class<? extends WritableComparable> c) { 047 WritableComparator comparator = comparators.get(c); 048 if (comparator == null) { 049 // force the static initializers to run 050 forceInit(c); 051 // look to see if it is defined now 052 comparator = comparators.get(c); 053 // if not, use the generic one 054 if (comparator == null) { 055 comparator = new WritableComparator(c, true); 056 } 057 } 058 return comparator; 059 } 060 061 /** 062 * Force initialization of the static members. 063 * As of Java 5, referencing a class doesn't force it to initialize. Since 064 * this class requires that the classes be initialized to declare their 065 * comparators, we force that initialization to happen. 066 * @param cls the class to initialize 067 */ 068 private static void forceInit(Class<?> cls) { 069 try { 070 Class.forName(cls.getName(), true, cls.getClassLoader()); 071 } catch (ClassNotFoundException e) { 072 throw new IllegalArgumentException("Can't initialize class " + cls, e); 073 } 074 } 075 076 /** Register an optimized comparator for a {@link WritableComparable} 077 * implementation. Comparators registered with this method must be 078 * thread-safe. */ 079 public static void define(Class c, WritableComparator comparator) { 080 comparators.put(c, comparator); 081 } 082 083 private final Class<? extends WritableComparable> keyClass; 084 private final WritableComparable key1; 085 private final WritableComparable key2; 086 private final DataInputBuffer buffer; 087 088 protected WritableComparator() { 089 this(null); 090 } 091 092 /** Construct for a {@link WritableComparable} implementation. */ 093 protected WritableComparator(Class<? extends WritableComparable> keyClass) { 094 this(keyClass, false); 095 } 096 097 protected WritableComparator(Class<? extends WritableComparable> keyClass, 098 boolean createInstances) { 099 this.keyClass = keyClass; 100 if (createInstances) { 101 key1 = newKey(); 102 key2 = newKey(); 103 buffer = new DataInputBuffer(); 104 } else { 105 key1 = key2 = null; 106 buffer = null; 107 } 108 } 109 110 /** Returns the WritableComparable implementation class. */ 111 public Class<? extends WritableComparable> getKeyClass() { return keyClass; } 112 113 /** Construct a new {@link WritableComparable} instance. */ 114 public WritableComparable newKey() { 115 return ReflectionUtils.newInstance(keyClass, null); 116 } 117 118 /** Optimization hook. Override this to make SequenceFile.Sorter's scream. 119 * 120 * <p>The default implementation reads the data into two {@link 121 * WritableComparable}s (using {@link 122 * Writable#readFields(DataInput)}, then calls {@link 123 * #compare(WritableComparable,WritableComparable)}. 124 */ 125 @Override 126 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { 127 try { 128 buffer.reset(b1, s1, l1); // parse key1 129 key1.readFields(buffer); 130 131 buffer.reset(b2, s2, l2); // parse key2 132 key2.readFields(buffer); 133 134 } catch (IOException e) { 135 throw new RuntimeException(e); 136 } 137 138 return compare(key1, key2); // compare them 139 } 140 141 /** Compare two WritableComparables. 142 * 143 * <p> The default implementation uses the natural ordering, calling {@link 144 * Comparable#compareTo(Object)}. */ 145 @SuppressWarnings("unchecked") 146 public int compare(WritableComparable a, WritableComparable b) { 147 return a.compareTo(b); 148 } 149 150 @Override 151 public int compare(Object a, Object b) { 152 return compare((WritableComparable)a, (WritableComparable)b); 153 } 154 155 /** Lexicographic order of binary data. */ 156 public static int compareBytes(byte[] b1, int s1, int l1, 157 byte[] b2, int s2, int l2) { 158 return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2); 159 } 160 161 /** Compute hash for binary data. */ 162 public static int hashBytes(byte[] bytes, int offset, int length) { 163 int hash = 1; 164 for (int i = offset; i < offset + length; i++) 165 hash = (31 * hash) + (int)bytes[i]; 166 return hash; 167 } 168 169 /** Compute hash for binary data. */ 170 public static int hashBytes(byte[] bytes, int length) { 171 return hashBytes(bytes, 0, length); 172 } 173 174 /** Parse an unsigned short from a byte array. */ 175 public static int readUnsignedShort(byte[] bytes, int start) { 176 return (((bytes[start] & 0xff) << 8) + 177 ((bytes[start+1] & 0xff))); 178 } 179 180 /** Parse an integer from a byte array. */ 181 public static int readInt(byte[] bytes, int start) { 182 return (((bytes[start ] & 0xff) << 24) + 183 ((bytes[start+1] & 0xff) << 16) + 184 ((bytes[start+2] & 0xff) << 8) + 185 ((bytes[start+3] & 0xff))); 186 187 } 188 189 /** Parse a float from a byte array. */ 190 public static float readFloat(byte[] bytes, int start) { 191 return Float.intBitsToFloat(readInt(bytes, start)); 192 } 193 194 /** Parse a long from a byte array. */ 195 public static long readLong(byte[] bytes, int start) { 196 return ((long)(readInt(bytes, start)) << 32) + 197 (readInt(bytes, start+4) & 0xFFFFFFFFL); 198 } 199 200 /** Parse a double from a byte array. */ 201 public static double readDouble(byte[] bytes, int start) { 202 return Double.longBitsToDouble(readLong(bytes, start)); 203 } 204 205 /** 206 * Reads a zero-compressed encoded long from a byte array and returns it. 207 * @param bytes byte array with decode long 208 * @param start starting index 209 * @throws java.io.IOException 210 * @return deserialized long 211 */ 212 public static long readVLong(byte[] bytes, int start) throws IOException { 213 int len = bytes[start]; 214 if (len >= -112) { 215 return len; 216 } 217 boolean isNegative = (len < -120); 218 len = isNegative ? -(len + 120) : -(len + 112); 219 if (start+1+len>bytes.length) 220 throw new IOException( 221 "Not enough number of bytes for a zero-compressed integer"); 222 long i = 0; 223 for (int idx = 0; idx < len; idx++) { 224 i = i << 8; 225 i = i | (bytes[start+1+idx] & 0xFF); 226 } 227 return (isNegative ? (i ^ -1L) : i); 228 } 229 230 /** 231 * Reads a zero-compressed encoded integer from a byte array and returns it. 232 * @param bytes byte array with the encoded integer 233 * @param start start index 234 * @throws java.io.IOException 235 * @return deserialized integer 236 */ 237 public static int readVInt(byte[] bytes, int start) throws IOException { 238 return (int) readVLong(bytes, start); 239 } 240 }