001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io; 020 021 import java.io.*; 022 023 import org.apache.hadoop.classification.InterfaceAudience; 024 import org.apache.hadoop.classification.InterfaceStability; 025 import org.apache.hadoop.conf.Configuration; 026 import org.apache.hadoop.util.ReflectionUtils; 027 028 import java.util.zip.GZIPInputStream; 029 import java.util.zip.GZIPOutputStream; 030 031 @InterfaceAudience.Public 032 @InterfaceStability.Stable 033 public final class WritableUtils { 034 035 public static byte[] readCompressedByteArray(DataInput in) throws IOException { 036 int length = in.readInt(); 037 if (length == -1) return null; 038 byte[] buffer = new byte[length]; 039 in.readFully(buffer); // could/should use readFully(buffer,0,length)? 040 GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); 041 byte[] outbuf = new byte[length]; 042 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 043 int len; 044 while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){ 045 bos.write(outbuf, 0, len); 046 } 047 byte[] decompressed = bos.toByteArray(); 048 bos.close(); 049 gzi.close(); 050 return decompressed; 051 } 052 053 public static void skipCompressedByteArray(DataInput in) throws IOException { 054 int length = in.readInt(); 055 if (length != -1) { 056 skipFully(in, length); 057 } 058 } 059 060 public static int writeCompressedByteArray(DataOutput out, 061 byte[] bytes) throws IOException { 062 if (bytes != null) { 063 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 064 GZIPOutputStream gzout = new GZIPOutputStream(bos); 065 try { 066 gzout.write(bytes, 0, bytes.length); 067 gzout.close(); 068 gzout = null; 069 } finally { 070 IOUtils.closeStream(gzout); 071 } 072 byte[] buffer = bos.toByteArray(); 073 int len = buffer.length; 074 out.writeInt(len); 075 out.write(buffer, 0, len); 076 /* debug only! Once we have confidence, can lose this. */ 077 return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0); 078 } else { 079 out.writeInt(-1); 080 return -1; 081 } 082 } 083 084 085 /* Ugly utility, maybe someone else can do this better */ 086 public static String readCompressedString(DataInput in) throws IOException { 087 byte[] bytes = readCompressedByteArray(in); 088 if (bytes == null) return null; 089 return new String(bytes, "UTF-8"); 090 } 091 092 093 public static int writeCompressedString(DataOutput out, String s) throws IOException { 094 return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); 095 } 096 097 /* 098 * 099 * Write a String as a Network Int n, followed by n Bytes 100 * Alternative to 16 bit read/writeUTF. 101 * Encoding standard is... ? 102 * 103 */ 104 public static void writeString(DataOutput out, String s) throws IOException { 105 if (s != null) { 106 byte[] buffer = s.getBytes("UTF-8"); 107 int len = buffer.length; 108 out.writeInt(len); 109 out.write(buffer, 0, len); 110 } else { 111 out.writeInt(-1); 112 } 113 } 114 115 /* 116 * Read a String as a Network Int n, followed by n Bytes 117 * Alternative to 16 bit read/writeUTF. 118 * Encoding standard is... ? 119 * 120 */ 121 public static String readString(DataInput in) throws IOException{ 122 int length = in.readInt(); 123 if (length == -1) return null; 124 byte[] buffer = new byte[length]; 125 in.readFully(buffer); // could/should use readFully(buffer,0,length)? 126 return new String(buffer,"UTF-8"); 127 } 128 129 130 /* 131 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 132 * Could be generalised using introspection. 133 * 134 */ 135 public static void writeStringArray(DataOutput out, String[] s) throws IOException{ 136 out.writeInt(s.length); 137 for(int i = 0; i < s.length; i++) { 138 writeString(out, s[i]); 139 } 140 } 141 142 /* 143 * Write a String array as a Nework Int N, followed by Int N Byte Array of 144 * compressed Strings. Handles also null arrays and null values. 145 * Could be generalised using introspection. 146 * 147 */ 148 public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{ 149 if (s == null) { 150 out.writeInt(-1); 151 return; 152 } 153 out.writeInt(s.length); 154 for(int i = 0; i < s.length; i++) { 155 writeCompressedString(out, s[i]); 156 } 157 } 158 159 /* 160 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 161 * Could be generalised using introspection. Actually this bit couldn't... 162 * 163 */ 164 public static String[] readStringArray(DataInput in) throws IOException { 165 int len = in.readInt(); 166 if (len == -1) return null; 167 String[] s = new String[len]; 168 for(int i = 0; i < len; i++) { 169 s[i] = readString(in); 170 } 171 return s; 172 } 173 174 175 /* 176 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 177 * Could be generalised using introspection. Handles null arrays and null values. 178 * 179 */ 180 public static String[] readCompressedStringArray(DataInput in) throws IOException { 181 int len = in.readInt(); 182 if (len == -1) return null; 183 String[] s = new String[len]; 184 for(int i = 0; i < len; i++) { 185 s[i] = readCompressedString(in); 186 } 187 return s; 188 } 189 190 191 /* 192 * 193 * Test Utility Method Display Byte Array. 194 * 195 */ 196 public static void displayByteArray(byte[] record){ 197 int i; 198 for(i=0;i < record.length -1; i++){ 199 if (i % 16 == 0) { System.out.println(); } 200 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); 201 System.out.print(Integer.toHexString(record[i] & 0x0F)); 202 System.out.print(","); 203 } 204 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); 205 System.out.print(Integer.toHexString(record[i] & 0x0F)); 206 System.out.println(); 207 } 208 209 /** 210 * Make a copy of a writable object using serialization to a buffer. 211 * @param orig The object to copy 212 * @return The copied object 213 */ 214 public static <T extends Writable> T clone(T orig, Configuration conf) { 215 try { 216 @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T> 217 T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf); 218 ReflectionUtils.copy(conf, orig, newInst); 219 return newInst; 220 } catch (IOException e) { 221 throw new RuntimeException("Error writing/reading clone buffer", e); 222 } 223 } 224 225 /** 226 * Make a copy of the writable object using serialiation to a buffer 227 * @param dst the object to copy from 228 * @param src the object to copy into, which is destroyed 229 * @throws IOException 230 * @deprecated use ReflectionUtils.cloneInto instead. 231 */ 232 @Deprecated 233 public static void cloneInto(Writable dst, Writable src) throws IOException { 234 ReflectionUtils.cloneWritableInto(dst, src); 235 } 236 237 /** 238 * Serializes an integer to a binary stream with zero-compressed encoding. 239 * For -120 <= i <= 127, only one byte is used with the actual value. 240 * For other values of i, the first byte value indicates whether the 241 * integer is positive or negative, and the number of bytes that follow. 242 * If the first byte value v is between -121 and -124, the following integer 243 * is positive, with number of bytes that follow are -(v+120). 244 * If the first byte value v is between -125 and -128, the following integer 245 * is negative, with number of bytes that follow are -(v+124). Bytes are 246 * stored in the high-non-zero-byte-first order. 247 * 248 * @param stream Binary output stream 249 * @param i Integer to be serialized 250 * @throws java.io.IOException 251 */ 252 public static void writeVInt(DataOutput stream, int i) throws IOException { 253 writeVLong(stream, i); 254 } 255 256 /** 257 * Serializes a long to a binary stream with zero-compressed encoding. 258 * For -112 <= i <= 127, only one byte is used with the actual value. 259 * For other values of i, the first byte value indicates whether the 260 * long is positive or negative, and the number of bytes that follow. 261 * If the first byte value v is between -113 and -120, the following long 262 * is positive, with number of bytes that follow are -(v+112). 263 * If the first byte value v is between -121 and -128, the following long 264 * is negative, with number of bytes that follow are -(v+120). Bytes are 265 * stored in the high-non-zero-byte-first order. 266 * 267 * @param stream Binary output stream 268 * @param i Long to be serialized 269 * @throws java.io.IOException 270 */ 271 public static void writeVLong(DataOutput stream, long i) throws IOException { 272 if (i >= -112 && i <= 127) { 273 stream.writeByte((byte)i); 274 return; 275 } 276 277 int len = -112; 278 if (i < 0) { 279 i ^= -1L; // take one's complement' 280 len = -120; 281 } 282 283 long tmp = i; 284 while (tmp != 0) { 285 tmp = tmp >> 8; 286 len--; 287 } 288 289 stream.writeByte((byte)len); 290 291 len = (len < -120) ? -(len + 120) : -(len + 112); 292 293 for (int idx = len; idx != 0; idx--) { 294 int shiftbits = (idx - 1) * 8; 295 long mask = 0xFFL << shiftbits; 296 stream.writeByte((byte)((i & mask) >> shiftbits)); 297 } 298 } 299 300 301 /** 302 * Reads a zero-compressed encoded long from input stream and returns it. 303 * @param stream Binary input stream 304 * @throws java.io.IOException 305 * @return deserialized long from stream. 306 */ 307 public static long readVLong(DataInput stream) throws IOException { 308 byte firstByte = stream.readByte(); 309 int len = decodeVIntSize(firstByte); 310 if (len == 1) { 311 return firstByte; 312 } 313 long i = 0; 314 for (int idx = 0; idx < len-1; idx++) { 315 byte b = stream.readByte(); 316 i = i << 8; 317 i = i | (b & 0xFF); 318 } 319 return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); 320 } 321 322 /** 323 * Reads a zero-compressed encoded integer from input stream and returns it. 324 * @param stream Binary input stream 325 * @throws java.io.IOException 326 * @return deserialized integer from stream. 327 */ 328 public static int readVInt(DataInput stream) throws IOException { 329 long n = readVLong(stream); 330 if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) { 331 throw new IOException("value too long to fit in integer"); 332 } 333 return (int)n; 334 } 335 336 /** 337 * Reads an integer from the input stream and returns it. 338 * 339 * This function validates that the integer is between [lower, upper], 340 * inclusive. 341 * 342 * @param stream Binary input stream 343 * @throws java.io.IOException 344 * @return deserialized integer from stream 345 */ 346 public static int readVIntInRange(DataInput stream, int lower, int upper) 347 throws IOException { 348 long n = readVLong(stream); 349 if (n < lower) { 350 if (lower == 0) { 351 throw new IOException("expected non-negative integer, got " + n); 352 } else { 353 throw new IOException("expected integer greater than or equal to " + 354 lower + ", got " + n); 355 } 356 } 357 if (n > upper) { 358 throw new IOException("expected integer less or equal to " + upper + 359 ", got " + n); 360 } 361 return (int)n; 362 } 363 364 /** 365 * Given the first byte of a vint/vlong, determine the sign 366 * @param value the first byte 367 * @return is the value negative 368 */ 369 public static boolean isNegativeVInt(byte value) { 370 return value < -120 || (value >= -112 && value < 0); 371 } 372 373 /** 374 * Parse the first byte of a vint/vlong to determine the number of bytes 375 * @param value the first byte of the vint/vlong 376 * @return the total number of bytes (1 to 9) 377 */ 378 public static int decodeVIntSize(byte value) { 379 if (value >= -112) { 380 return 1; 381 } else if (value < -120) { 382 return -119 - value; 383 } 384 return -111 - value; 385 } 386 387 /** 388 * Get the encoded length if an integer is stored in a variable-length format 389 * @return the encoded length 390 */ 391 public static int getVIntSize(long i) { 392 if (i >= -112 && i <= 127) { 393 return 1; 394 } 395 396 if (i < 0) { 397 i ^= -1L; // take one's complement' 398 } 399 // find the number of bytes with non-leading zeros 400 int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i); 401 // find the number of data bytes + length byte 402 return (dataBits + 7) / 8 + 1; 403 } 404 /** 405 * Read an Enum value from DataInput, Enums are read and written 406 * using String values. 407 * @param <T> Enum type 408 * @param in DataInput to read from 409 * @param enumType Class type of Enum 410 * @return Enum represented by String read from DataInput 411 * @throws IOException 412 */ 413 public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType) 414 throws IOException{ 415 return T.valueOf(enumType, Text.readString(in)); 416 } 417 /** 418 * writes String value of enum to DataOutput. 419 * @param out Dataoutput stream 420 * @param enumVal enum value 421 * @throws IOException 422 */ 423 public static void writeEnum(DataOutput out, Enum<?> enumVal) 424 throws IOException{ 425 Text.writeString(out, enumVal.name()); 426 } 427 /** 428 * Skip <i>len</i> number of bytes in input stream<i>in</i> 429 * @param in input stream 430 * @param len number of bytes to skip 431 * @throws IOException when skipped less number of bytes 432 */ 433 public static void skipFully(DataInput in, int len) throws IOException { 434 int total = 0; 435 int cur = 0; 436 437 while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) { 438 total += cur; 439 } 440 441 if (total<len) { 442 throw new IOException("Not able to skip " + len + " bytes, possibly " + 443 "due to end of input."); 444 } 445 } 446 447 /** Convert writables to a byte array */ 448 public static byte[] toByteArray(Writable... writables) { 449 final DataOutputBuffer out = new DataOutputBuffer(); 450 try { 451 for(Writable w : writables) { 452 w.write(out); 453 } 454 out.close(); 455 } catch (IOException e) { 456 throw new RuntimeException("Fail to convert writables to a byte array",e); 457 } 458 return out.getData(); 459 } 460 461 /** 462 * Read a string, but check it for sanity. The format consists of a vint 463 * followed by the given number of bytes. 464 * @param in the stream to read from 465 * @param maxLength the largest acceptable length of the encoded string 466 * @return the bytes as a string 467 * @throws IOException if reading from the DataInput fails 468 * @throws IllegalArgumentException if the encoded byte size for string 469 is negative or larger than maxSize. Only the vint is read. 470 */ 471 public static String readStringSafely(DataInput in, 472 int maxLength 473 ) throws IOException, 474 IllegalArgumentException { 475 int length = readVInt(in); 476 if (length < 0 || length > maxLength) { 477 throw new IllegalArgumentException("Encoded byte size for String was " + length + 478 ", which is outside of 0.." + 479 maxLength + " range."); 480 } 481 byte [] bytes = new byte[length]; 482 in.readFully(bytes, 0, length); 483 return Text.decode(bytes); 484 } 485 }