001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022 023import org.apache.hadoop.classification.InterfaceAudience; 024import org.apache.hadoop.classification.InterfaceStability; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.util.ReflectionUtils; 027 028import java.util.zip.GZIPInputStream; 029import java.util.zip.GZIPOutputStream; 030 031@InterfaceAudience.Public 032@InterfaceStability.Stable 033public final class WritableUtils { 034 035 public static byte[] readCompressedByteArray(DataInput in) throws IOException { 036 int length = in.readInt(); 037 if (length == -1) return null; 038 byte[] buffer = new byte[length]; 039 in.readFully(buffer); // could/should use readFully(buffer,0,length)? 040 GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length)); 041 byte[] outbuf = new byte[length]; 042 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 043 int len; 044 while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){ 045 bos.write(outbuf, 0, len); 046 } 047 byte[] decompressed = bos.toByteArray(); 048 bos.close(); 049 gzi.close(); 050 return decompressed; 051 } 052 053 public static void skipCompressedByteArray(DataInput in) throws IOException { 054 int length = in.readInt(); 055 if (length != -1) { 056 skipFully(in, length); 057 } 058 } 059 060 public static int writeCompressedByteArray(DataOutput out, 061 byte[] bytes) throws IOException { 062 if (bytes != null) { 063 ByteArrayOutputStream bos = new ByteArrayOutputStream(); 064 GZIPOutputStream gzout = new GZIPOutputStream(bos); 065 try { 066 gzout.write(bytes, 0, bytes.length); 067 gzout.close(); 068 gzout = null; 069 } finally { 070 IOUtils.closeStream(gzout); 071 } 072 byte[] buffer = bos.toByteArray(); 073 int len = buffer.length; 074 out.writeInt(len); 075 out.write(buffer, 0, len); 076 /* debug only! Once we have confidence, can lose this. */ 077 return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0); 078 } else { 079 out.writeInt(-1); 080 return -1; 081 } 082 } 083 084 085 /* Ugly utility, maybe someone else can do this better */ 086 public static String readCompressedString(DataInput in) throws IOException { 087 byte[] bytes = readCompressedByteArray(in); 088 if (bytes == null) return null; 089 return new String(bytes, "UTF-8"); 090 } 091 092 093 public static int writeCompressedString(DataOutput out, String s) throws IOException { 094 return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null); 095 } 096 097 /* 098 * 099 * Write a String as a Network Int n, followed by n Bytes 100 * Alternative to 16 bit read/writeUTF. 101 * Encoding standard is... ? 102 * 103 */ 104 public static void writeString(DataOutput out, String s) throws IOException { 105 if (s != null) { 106 byte[] buffer = s.getBytes("UTF-8"); 107 int len = buffer.length; 108 out.writeInt(len); 109 out.write(buffer, 0, len); 110 } else { 111 out.writeInt(-1); 112 } 113 } 114 115 /* 116 * Read a String as a Network Int n, followed by n Bytes 117 * Alternative to 16 bit read/writeUTF. 118 * Encoding standard is... ? 119 * 120 */ 121 public static String readString(DataInput in) throws IOException{ 122 int length = in.readInt(); 123 if (length == -1) return null; 124 byte[] buffer = new byte[length]; 125 in.readFully(buffer); // could/should use readFully(buffer,0,length)? 126 return new String(buffer,"UTF-8"); 127 } 128 129 130 /* 131 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 132 * Could be generalised using introspection. 133 * 134 */ 135 public static void writeStringArray(DataOutput out, String[] s) throws IOException{ 136 out.writeInt(s.length); 137 for(int i = 0; i < s.length; i++) { 138 writeString(out, s[i]); 139 } 140 } 141 142 /* 143 * Write a String array as a Nework Int N, followed by Int N Byte Array of 144 * compressed Strings. Handles also null arrays and null values. 145 * Could be generalised using introspection. 146 * 147 */ 148 public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{ 149 if (s == null) { 150 out.writeInt(-1); 151 return; 152 } 153 out.writeInt(s.length); 154 for(int i = 0; i < s.length; i++) { 155 writeCompressedString(out, s[i]); 156 } 157 } 158 159 /* 160 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 161 * Could be generalised using introspection. Actually this bit couldn't... 162 * 163 */ 164 public static String[] readStringArray(DataInput in) throws IOException { 165 int len = in.readInt(); 166 if (len == -1) return null; 167 String[] s = new String[len]; 168 for(int i = 0; i < len; i++) { 169 s[i] = readString(in); 170 } 171 return s; 172 } 173 174 175 /* 176 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings. 177 * Could be generalised using introspection. Handles null arrays and null values. 178 * 179 */ 180 public static String[] readCompressedStringArray(DataInput in) throws IOException { 181 int len = in.readInt(); 182 if (len == -1) return null; 183 String[] s = new String[len]; 184 for(int i = 0; i < len; i++) { 185 s[i] = readCompressedString(in); 186 } 187 return s; 188 } 189 190 191 /* 192 * 193 * Test Utility Method Display Byte Array. 194 * 195 */ 196 public static void displayByteArray(byte[] record){ 197 int i; 198 for(i=0;i < record.length -1; i++){ 199 if (i % 16 == 0) { System.out.println(); } 200 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); 201 System.out.print(Integer.toHexString(record[i] & 0x0F)); 202 System.out.print(","); 203 } 204 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F)); 205 System.out.print(Integer.toHexString(record[i] & 0x0F)); 206 System.out.println(); 207 } 208 209 /** 210 * Make a copy of a writable object using serialization to a buffer. 211 * @param orig The object to copy 212 * @return The copied object 213 */ 214 public static <T extends Writable> T clone(T orig, Configuration conf) { 215 try { 216 @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T> 217 T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf); 218 ReflectionUtils.copy(conf, orig, newInst); 219 return newInst; 220 } catch (IOException e) { 221 throw new RuntimeException("Error writing/reading clone buffer", e); 222 } 223 } 224 225 /** 226 * Make a copy of the writable object using serialiation to a buffer 227 * @param dst the object to copy from 228 * @param src the object to copy into, which is destroyed 229 * @throws IOException 230 * @deprecated use ReflectionUtils.cloneInto instead. 231 */ 232 @Deprecated 233 public static void cloneInto(Writable dst, Writable src) throws IOException { 234 ReflectionUtils.cloneWritableInto(dst, src); 235 } 236 237 /** 238 * Serializes an integer to a binary stream with zero-compressed encoding. 239 * For -120 <= i <= 127, only one byte is used with the actual value. 240 * For other values of i, the first byte value indicates whether the 241 * integer is positive or negative, and the number of bytes that follow. 242 * If the first byte value v is between -121 and -124, the following integer 243 * is positive, with number of bytes that follow are -(v+120). 244 * If the first byte value v is between -125 and -128, the following integer 245 * is negative, with number of bytes that follow are -(v+124). Bytes are 246 * stored in the high-non-zero-byte-first order. 247 * 248 * @param stream Binary output stream 249 * @param i Integer to be serialized 250 * @throws java.io.IOException 251 */ 252 public static void writeVInt(DataOutput stream, int i) throws IOException { 253 writeVLong(stream, i); 254 } 255 256 /** 257 * Serializes a long to a binary stream with zero-compressed encoding. 258 * For -112 <= i <= 127, only one byte is used with the actual value. 259 * For other values of i, the first byte value indicates whether the 260 * long is positive or negative, and the number of bytes that follow. 261 * If the first byte value v is between -113 and -120, the following long 262 * is positive, with number of bytes that follow are -(v+112). 263 * If the first byte value v is between -121 and -128, the following long 264 * is negative, with number of bytes that follow are -(v+120). Bytes are 265 * stored in the high-non-zero-byte-first order. 266 * 267 * @param stream Binary output stream 268 * @param i Long to be serialized 269 * @throws java.io.IOException 270 */ 271 public static void writeVLong(DataOutput stream, long i) throws IOException { 272 if (i >= -112 && i <= 127) { 273 stream.writeByte((byte)i); 274 return; 275 } 276 277 int len = -112; 278 if (i < 0) { 279 i ^= -1L; // take one's complement' 280 len = -120; 281 } 282 283 long tmp = i; 284 while (tmp != 0) { 285 tmp = tmp >> 8; 286 len--; 287 } 288 289 stream.writeByte((byte)len); 290 291 len = (len < -120) ? -(len + 120) : -(len + 112); 292 293 for (int idx = len; idx != 0; idx--) { 294 int shiftbits = (idx - 1) * 8; 295 long mask = 0xFFL << shiftbits; 296 stream.writeByte((byte)((i & mask) >> shiftbits)); 297 } 298 } 299 300 301 /** 302 * Reads a zero-compressed encoded long from input stream and returns it. 303 * @param stream Binary input stream 304 * @throws java.io.IOException 305 * @return deserialized long from stream. 306 */ 307 public static long readVLong(DataInput stream) throws IOException { 308 byte firstByte = stream.readByte(); 309 int len = decodeVIntSize(firstByte); 310 if (len == 1) { 311 return firstByte; 312 } 313 long i = 0; 314 for (int idx = 0; idx < len-1; idx++) { 315 byte b = stream.readByte(); 316 i = i << 8; 317 i = i | (b & 0xFF); 318 } 319 return (isNegativeVInt(firstByte) ? (i ^ -1L) : i); 320 } 321 322 /** 323 * Reads a zero-compressed encoded integer from input stream and returns it. 324 * @param stream Binary input stream 325 * @throws java.io.IOException 326 * @return deserialized integer from stream. 327 */ 328 public static int readVInt(DataInput stream) throws IOException { 329 return (int) readVLong(stream); 330 } 331 332 /** 333 * Given the first byte of a vint/vlong, determine the sign 334 * @param value the first byte 335 * @return is the value negative 336 */ 337 public static boolean isNegativeVInt(byte value) { 338 return value < -120 || (value >= -112 && value < 0); 339 } 340 341 /** 342 * Parse the first byte of a vint/vlong to determine the number of bytes 343 * @param value the first byte of the vint/vlong 344 * @return the total number of bytes (1 to 9) 345 */ 346 public static int decodeVIntSize(byte value) { 347 if (value >= -112) { 348 return 1; 349 } else if (value < -120) { 350 return -119 - value; 351 } 352 return -111 - value; 353 } 354 355 /** 356 * Get the encoded length if an integer is stored in a variable-length format 357 * @return the encoded length 358 */ 359 public static int getVIntSize(long i) { 360 if (i >= -112 && i <= 127) { 361 return 1; 362 } 363 364 if (i < 0) { 365 i ^= -1L; // take one's complement' 366 } 367 // find the number of bytes with non-leading zeros 368 int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i); 369 // find the number of data bytes + length byte 370 return (dataBits + 7) / 8 + 1; 371 } 372 /** 373 * Read an Enum value from DataInput, Enums are read and written 374 * using String values. 375 * @param <T> Enum type 376 * @param in DataInput to read from 377 * @param enumType Class type of Enum 378 * @return Enum represented by String read from DataInput 379 * @throws IOException 380 */ 381 public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType) 382 throws IOException{ 383 return T.valueOf(enumType, Text.readString(in)); 384 } 385 /** 386 * writes String value of enum to DataOutput. 387 * @param out Dataoutput stream 388 * @param enumVal enum value 389 * @throws IOException 390 */ 391 public static void writeEnum(DataOutput out, Enum<?> enumVal) 392 throws IOException{ 393 Text.writeString(out, enumVal.name()); 394 } 395 /** 396 * Skip <i>len</i> number of bytes in input stream<i>in</i> 397 * @param in input stream 398 * @param len number of bytes to skip 399 * @throws IOException when skipped less number of bytes 400 */ 401 public static void skipFully(DataInput in, int len) throws IOException { 402 int total = 0; 403 int cur = 0; 404 405 while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) { 406 total += cur; 407 } 408 409 if (total<len) { 410 throw new IOException("Not able to skip " + len + " bytes, possibly " + 411 "due to end of input."); 412 } 413 } 414 415 /** Convert writables to a byte array */ 416 public static byte[] toByteArray(Writable... writables) { 417 final DataOutputBuffer out = new DataOutputBuffer(); 418 try { 419 for(Writable w : writables) { 420 w.write(out); 421 } 422 out.close(); 423 } catch (IOException e) { 424 throw new RuntimeException("Fail to convert writables to a byte array",e); 425 } 426 return out.getData(); 427 } 428 429 /** 430 * Read a string, but check it for sanity. The format consists of a vint 431 * followed by the given number of bytes. 432 * @param in the stream to read from 433 * @param maxLength the largest acceptable length of the encoded string 434 * @return the bytes as a string 435 * @throws IOException if reading from the DataInput fails 436 * @throws IllegalArgumentException if the encoded byte size for string 437 is negative or larger than maxSize. Only the vint is read. 438 */ 439 public static String readStringSafely(DataInput in, 440 int maxLength 441 ) throws IOException, 442 IllegalArgumentException { 443 int length = readVInt(in); 444 if (length < 0 || length > maxLength) { 445 throw new IllegalArgumentException("Encoded byte size for String was " + length + 446 ", which is outside of 0.." + 447 maxLength + " range."); 448 } 449 byte [] bytes = new byte[length]; 450 in.readFully(bytes, 0, length); 451 return Text.decode(bytes); 452 } 453}