001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.io.*;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.conf.Configuration;
026 import org.apache.hadoop.util.ReflectionUtils;
027
028 import java.util.zip.GZIPInputStream;
029 import java.util.zip.GZIPOutputStream;
030
031 @InterfaceAudience.Public
032 @InterfaceStability.Stable
033 public final class WritableUtils {
034
035 public static byte[] readCompressedByteArray(DataInput in) throws IOException {
036 int length = in.readInt();
037 if (length == -1) return null;
038 byte[] buffer = new byte[length];
039 in.readFully(buffer); // could/should use readFully(buffer,0,length)?
040 GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
041 byte[] outbuf = new byte[length];
042 ByteArrayOutputStream bos = new ByteArrayOutputStream();
043 int len;
044 while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
045 bos.write(outbuf, 0, len);
046 }
047 byte[] decompressed = bos.toByteArray();
048 bos.close();
049 gzi.close();
050 return decompressed;
051 }
052
053 public static void skipCompressedByteArray(DataInput in) throws IOException {
054 int length = in.readInt();
055 if (length != -1) {
056 skipFully(in, length);
057 }
058 }
059
060 public static int writeCompressedByteArray(DataOutput out,
061 byte[] bytes) throws IOException {
062 if (bytes != null) {
063 ByteArrayOutputStream bos = new ByteArrayOutputStream();
064 GZIPOutputStream gzout = new GZIPOutputStream(bos);
065 try {
066 gzout.write(bytes, 0, bytes.length);
067 gzout.close();
068 gzout = null;
069 } finally {
070 IOUtils.closeStream(gzout);
071 }
072 byte[] buffer = bos.toByteArray();
073 int len = buffer.length;
074 out.writeInt(len);
075 out.write(buffer, 0, len);
076 /* debug only! Once we have confidence, can lose this. */
077 return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
078 } else {
079 out.writeInt(-1);
080 return -1;
081 }
082 }
083
084
085 /* Ugly utility, maybe someone else can do this better */
086 public static String readCompressedString(DataInput in) throws IOException {
087 byte[] bytes = readCompressedByteArray(in);
088 if (bytes == null) return null;
089 return new String(bytes, "UTF-8");
090 }
091
092
093 public static int writeCompressedString(DataOutput out, String s) throws IOException {
094 return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
095 }
096
097 /*
098 *
099 * Write a String as a Network Int n, followed by n Bytes
100 * Alternative to 16 bit read/writeUTF.
101 * Encoding standard is... ?
102 *
103 */
104 public static void writeString(DataOutput out, String s) throws IOException {
105 if (s != null) {
106 byte[] buffer = s.getBytes("UTF-8");
107 int len = buffer.length;
108 out.writeInt(len);
109 out.write(buffer, 0, len);
110 } else {
111 out.writeInt(-1);
112 }
113 }
114
115 /*
116 * Read a String as a Network Int n, followed by n Bytes
117 * Alternative to 16 bit read/writeUTF.
118 * Encoding standard is... ?
119 *
120 */
121 public static String readString(DataInput in) throws IOException{
122 int length = in.readInt();
123 if (length == -1) return null;
124 byte[] buffer = new byte[length];
125 in.readFully(buffer); // could/should use readFully(buffer,0,length)?
126 return new String(buffer,"UTF-8");
127 }
128
129
130 /*
131 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
132 * Could be generalised using introspection.
133 *
134 */
135 public static void writeStringArray(DataOutput out, String[] s) throws IOException{
136 out.writeInt(s.length);
137 for(int i = 0; i < s.length; i++) {
138 writeString(out, s[i]);
139 }
140 }
141
142 /*
143 * Write a String array as a Nework Int N, followed by Int N Byte Array of
144 * compressed Strings. Handles also null arrays and null values.
145 * Could be generalised using introspection.
146 *
147 */
148 public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
149 if (s == null) {
150 out.writeInt(-1);
151 return;
152 }
153 out.writeInt(s.length);
154 for(int i = 0; i < s.length; i++) {
155 writeCompressedString(out, s[i]);
156 }
157 }
158
159 /*
160 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
161 * Could be generalised using introspection. Actually this bit couldn't...
162 *
163 */
164 public static String[] readStringArray(DataInput in) throws IOException {
165 int len = in.readInt();
166 if (len == -1) return null;
167 String[] s = new String[len];
168 for(int i = 0; i < len; i++) {
169 s[i] = readString(in);
170 }
171 return s;
172 }
173
174
175 /*
176 * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
177 * Could be generalised using introspection. Handles null arrays and null values.
178 *
179 */
180 public static String[] readCompressedStringArray(DataInput in) throws IOException {
181 int len = in.readInt();
182 if (len == -1) return null;
183 String[] s = new String[len];
184 for(int i = 0; i < len; i++) {
185 s[i] = readCompressedString(in);
186 }
187 return s;
188 }
189
190
191 /*
192 *
193 * Test Utility Method Display Byte Array.
194 *
195 */
196 public static void displayByteArray(byte[] record){
197 int i;
198 for(i=0;i < record.length -1; i++){
199 if (i % 16 == 0) { System.out.println(); }
200 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
201 System.out.print(Integer.toHexString(record[i] & 0x0F));
202 System.out.print(",");
203 }
204 System.out.print(Integer.toHexString(record[i] >> 4 & 0x0F));
205 System.out.print(Integer.toHexString(record[i] & 0x0F));
206 System.out.println();
207 }
208
209 /**
210 * Make a copy of a writable object using serialization to a buffer.
211 * @param orig The object to copy
212 * @return The copied object
213 */
214 public static <T extends Writable> T clone(T orig, Configuration conf) {
215 try {
216 @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
217 T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
218 ReflectionUtils.copy(conf, orig, newInst);
219 return newInst;
220 } catch (IOException e) {
221 throw new RuntimeException("Error writing/reading clone buffer", e);
222 }
223 }
224
225 /**
226 * Make a copy of the writable object using serialiation to a buffer
227 * @param dst the object to copy from
228 * @param src the object to copy into, which is destroyed
229 * @throws IOException
230 * @deprecated use ReflectionUtils.cloneInto instead.
231 */
232 @Deprecated
233 public static void cloneInto(Writable dst, Writable src) throws IOException {
234 ReflectionUtils.cloneWritableInto(dst, src);
235 }
236
237 /**
238 * Serializes an integer to a binary stream with zero-compressed encoding.
239 * For -120 <= i <= 127, only one byte is used with the actual value.
240 * For other values of i, the first byte value indicates whether the
241 * integer is positive or negative, and the number of bytes that follow.
242 * If the first byte value v is between -121 and -124, the following integer
243 * is positive, with number of bytes that follow are -(v+120).
244 * If the first byte value v is between -125 and -128, the following integer
245 * is negative, with number of bytes that follow are -(v+124). Bytes are
246 * stored in the high-non-zero-byte-first order.
247 *
248 * @param stream Binary output stream
249 * @param i Integer to be serialized
250 * @throws java.io.IOException
251 */
252 public static void writeVInt(DataOutput stream, int i) throws IOException {
253 writeVLong(stream, i);
254 }
255
256 /**
257 * Serializes a long to a binary stream with zero-compressed encoding.
258 * For -112 <= i <= 127, only one byte is used with the actual value.
259 * For other values of i, the first byte value indicates whether the
260 * long is positive or negative, and the number of bytes that follow.
261 * If the first byte value v is between -113 and -120, the following long
262 * is positive, with number of bytes that follow are -(v+112).
263 * If the first byte value v is between -121 and -128, the following long
264 * is negative, with number of bytes that follow are -(v+120). Bytes are
265 * stored in the high-non-zero-byte-first order.
266 *
267 * @param stream Binary output stream
268 * @param i Long to be serialized
269 * @throws java.io.IOException
270 */
271 public static void writeVLong(DataOutput stream, long i) throws IOException {
272 if (i >= -112 && i <= 127) {
273 stream.writeByte((byte)i);
274 return;
275 }
276
277 int len = -112;
278 if (i < 0) {
279 i ^= -1L; // take one's complement'
280 len = -120;
281 }
282
283 long tmp = i;
284 while (tmp != 0) {
285 tmp = tmp >> 8;
286 len--;
287 }
288
289 stream.writeByte((byte)len);
290
291 len = (len < -120) ? -(len + 120) : -(len + 112);
292
293 for (int idx = len; idx != 0; idx--) {
294 int shiftbits = (idx - 1) * 8;
295 long mask = 0xFFL << shiftbits;
296 stream.writeByte((byte)((i & mask) >> shiftbits));
297 }
298 }
299
300
301 /**
302 * Reads a zero-compressed encoded long from input stream and returns it.
303 * @param stream Binary input stream
304 * @throws java.io.IOException
305 * @return deserialized long from stream.
306 */
307 public static long readVLong(DataInput stream) throws IOException {
308 byte firstByte = stream.readByte();
309 int len = decodeVIntSize(firstByte);
310 if (len == 1) {
311 return firstByte;
312 }
313 long i = 0;
314 for (int idx = 0; idx < len-1; idx++) {
315 byte b = stream.readByte();
316 i = i << 8;
317 i = i | (b & 0xFF);
318 }
319 return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
320 }
321
322 /**
323 * Reads a zero-compressed encoded integer from input stream and returns it.
324 * @param stream Binary input stream
325 * @throws java.io.IOException
326 * @return deserialized integer from stream.
327 */
328 public static int readVInt(DataInput stream) throws IOException {
329 long n = readVLong(stream);
330 if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
331 throw new IOException("value too long to fit in integer");
332 }
333 return (int)n;
334 }
335
336 /**
337 * Reads an integer from the input stream and returns it.
338 *
339 * This function validates that the integer is between [lower, upper],
340 * inclusive.
341 *
342 * @param stream Binary input stream
343 * @throws java.io.IOException
344 * @return deserialized integer from stream
345 */
346 public static int readVIntInRange(DataInput stream, int lower, int upper)
347 throws IOException {
348 long n = readVLong(stream);
349 if (n < lower) {
350 if (lower == 0) {
351 throw new IOException("expected non-negative integer, got " + n);
352 } else {
353 throw new IOException("expected integer greater than or equal to " +
354 lower + ", got " + n);
355 }
356 }
357 if (n > upper) {
358 throw new IOException("expected integer less or equal to " + upper +
359 ", got " + n);
360 }
361 return (int)n;
362 }
363
364 /**
365 * Given the first byte of a vint/vlong, determine the sign
366 * @param value the first byte
367 * @return is the value negative
368 */
369 public static boolean isNegativeVInt(byte value) {
370 return value < -120 || (value >= -112 && value < 0);
371 }
372
373 /**
374 * Parse the first byte of a vint/vlong to determine the number of bytes
375 * @param value the first byte of the vint/vlong
376 * @return the total number of bytes (1 to 9)
377 */
378 public static int decodeVIntSize(byte value) {
379 if (value >= -112) {
380 return 1;
381 } else if (value < -120) {
382 return -119 - value;
383 }
384 return -111 - value;
385 }
386
387 /**
388 * Get the encoded length if an integer is stored in a variable-length format
389 * @return the encoded length
390 */
391 public static int getVIntSize(long i) {
392 if (i >= -112 && i <= 127) {
393 return 1;
394 }
395
396 if (i < 0) {
397 i ^= -1L; // take one's complement'
398 }
399 // find the number of bytes with non-leading zeros
400 int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
401 // find the number of data bytes + length byte
402 return (dataBits + 7) / 8 + 1;
403 }
404 /**
405 * Read an Enum value from DataInput, Enums are read and written
406 * using String values.
407 * @param <T> Enum type
408 * @param in DataInput to read from
409 * @param enumType Class type of Enum
410 * @return Enum represented by String read from DataInput
411 * @throws IOException
412 */
413 public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
414 throws IOException{
415 return T.valueOf(enumType, Text.readString(in));
416 }
417 /**
418 * writes String value of enum to DataOutput.
419 * @param out Dataoutput stream
420 * @param enumVal enum value
421 * @throws IOException
422 */
423 public static void writeEnum(DataOutput out, Enum<?> enumVal)
424 throws IOException{
425 Text.writeString(out, enumVal.name());
426 }
427 /**
428 * Skip <i>len</i> number of bytes in input stream<i>in</i>
429 * @param in input stream
430 * @param len number of bytes to skip
431 * @throws IOException when skipped less number of bytes
432 */
433 public static void skipFully(DataInput in, int len) throws IOException {
434 int total = 0;
435 int cur = 0;
436
437 while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
438 total += cur;
439 }
440
441 if (total<len) {
442 throw new IOException("Not able to skip " + len + " bytes, possibly " +
443 "due to end of input.");
444 }
445 }
446
447 /** Convert writables to a byte array */
448 public static byte[] toByteArray(Writable... writables) {
449 final DataOutputBuffer out = new DataOutputBuffer();
450 try {
451 for(Writable w : writables) {
452 w.write(out);
453 }
454 out.close();
455 } catch (IOException e) {
456 throw new RuntimeException("Fail to convert writables to a byte array",e);
457 }
458 return out.getData();
459 }
460
461 /**
462 * Read a string, but check it for sanity. The format consists of a vint
463 * followed by the given number of bytes.
464 * @param in the stream to read from
465 * @param maxLength the largest acceptable length of the encoded string
466 * @return the bytes as a string
467 * @throws IOException if reading from the DataInput fails
468 * @throws IllegalArgumentException if the encoded byte size for string
469 is negative or larger than maxSize. Only the vint is read.
470 */
471 public static String readStringSafely(DataInput in,
472 int maxLength
473 ) throws IOException,
474 IllegalArgumentException {
475 int length = readVInt(in);
476 if (length < 0 || length > maxLength) {
477 throw new IllegalArgumentException("Encoded byte size for String was " + length +
478 ", which is outside of 0.." +
479 maxLength + " range.");
480 }
481 byte [] bytes = new byte[length];
482 in.readFully(bytes, 0, length);
483 return Text.decode(bytes);
484 }
485 }