001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.util.ReflectionUtils;
027
028import java.util.zip.GZIPInputStream;
029import java.util.zip.GZIPOutputStream;
030
031@InterfaceAudience.Public
032@InterfaceStability.Stable
033public final class WritableUtils  {
034
035  public static byte[] readCompressedByteArray(DataInput in) throws IOException {
036    int length = in.readInt();
037    if (length == -1) return null;
038    byte[] buffer = new byte[length];
039    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
040    GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
041    byte[] outbuf = new byte[length];
042    ByteArrayOutputStream bos =  new ByteArrayOutputStream();
043    int len;
044    while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
045      bos.write(outbuf, 0, len);
046    }
047    byte[] decompressed =  bos.toByteArray();
048    bos.close();
049    gzi.close();
050    return decompressed;
051  }
052
053  public static void skipCompressedByteArray(DataInput in) throws IOException {
054    int length = in.readInt();
055    if (length != -1) {
056      skipFully(in, length);
057    }
058  }
059
060  public static int  writeCompressedByteArray(DataOutput out, 
061                                              byte[] bytes) throws IOException {
062    if (bytes != null) {
063      ByteArrayOutputStream bos =  new ByteArrayOutputStream();
064      GZIPOutputStream gzout = new GZIPOutputStream(bos);
065      try {
066        gzout.write(bytes, 0, bytes.length);
067        gzout.close();
068        gzout = null;
069      } finally {
070        IOUtils.closeStream(gzout);
071      }
072      byte[] buffer = bos.toByteArray();
073      int len = buffer.length;
074      out.writeInt(len);
075      out.write(buffer, 0, len);
076      /* debug only! Once we have confidence, can lose this. */
077      return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
078    } else {
079      out.writeInt(-1);
080      return -1;
081    }
082  }
083
084
085  /* Ugly utility, maybe someone else can do this better  */
086  public static String readCompressedString(DataInput in) throws IOException {
087    byte[] bytes = readCompressedByteArray(in);
088    if (bytes == null) return null;
089    return new String(bytes, "UTF-8");
090  }
091
092
093  public static int  writeCompressedString(DataOutput out, String s) throws IOException {
094    return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
095  }
096
097  /*
098   *
099   * Write a String as a Network Int n, followed by n Bytes
100   * Alternative to 16 bit read/writeUTF.
101   * Encoding standard is... ?
102   * 
103   */
104  public static void writeString(DataOutput out, String s) throws IOException {
105    if (s != null) {
106      byte[] buffer = s.getBytes("UTF-8");
107      int len = buffer.length;
108      out.writeInt(len);
109      out.write(buffer, 0, len);
110    } else {
111      out.writeInt(-1);
112    }
113  }
114
115  /*
116   * Read a String as a Network Int n, followed by n Bytes
117   * Alternative to 16 bit read/writeUTF.
118   * Encoding standard is... ?
119   *
120   */
121  public static String readString(DataInput in) throws IOException{
122    int length = in.readInt();
123    if (length == -1) return null;
124    byte[] buffer = new byte[length];
125    in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
126    return new String(buffer,"UTF-8");  
127  }
128
129
130  /*
131   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
132   * Could be generalised using introspection.
133   *
134   */
135  public static void writeStringArray(DataOutput out, String[] s) throws IOException{
136    out.writeInt(s.length);
137    for(int i = 0; i < s.length; i++) {
138      writeString(out, s[i]);
139    }
140  }
141
142  /*
143   * Write a String array as a Nework Int N, followed by Int N Byte Array of
144   * compressed Strings. Handles also null arrays and null values.
145   * Could be generalised using introspection.
146   *
147   */
148  public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
149    if (s == null) {
150      out.writeInt(-1);
151      return;
152    }
153    out.writeInt(s.length);
154    for(int i = 0; i < s.length; i++) {
155      writeCompressedString(out, s[i]);
156    }
157  }
158
159  /*
160   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
161   * Could be generalised using introspection. Actually this bit couldn't...
162   *
163   */
164  public static String[] readStringArray(DataInput in) throws IOException {
165    int len = in.readInt();
166    if (len == -1) return null;
167    String[] s = new String[len];
168    for(int i = 0; i < len; i++) {
169      s[i] = readString(in);
170    }
171    return s;
172  }
173
174
175  /*
176   * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
177   * Could be generalised using introspection. Handles null arrays and null values.
178   *
179   */
180  public static  String[] readCompressedStringArray(DataInput in) throws IOException {
181    int len = in.readInt();
182    if (len == -1) return null;
183    String[] s = new String[len];
184    for(int i = 0; i < len; i++) {
185      s[i] = readCompressedString(in);
186    }
187    return s;
188  }
189
190
191  /*
192   *
193   * Test Utility Method Display Byte Array. 
194   *
195   */
196  public static void displayByteArray(byte[] record){
197    int i;
198    for(i=0;i < record.length -1; i++){
199      if (i % 16 == 0) { System.out.println(); }
200      System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
201      System.out.print(Integer.toHexString(record[i] & 0x0F));
202      System.out.print(",");
203    }
204    System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
205    System.out.print(Integer.toHexString(record[i] & 0x0F));
206    System.out.println();
207  }
208
209  /**
210   * Make a copy of a writable object using serialization to a buffer.
211   * @param orig The object to copy
212   * @return The copied object
213   */
214  public static <T extends Writable> T clone(T orig, Configuration conf) {
215    try {
216      @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
217      T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
218      ReflectionUtils.copy(conf, orig, newInst);
219      return newInst;
220    } catch (IOException e) {
221      throw new RuntimeException("Error writing/reading clone buffer", e);
222    }
223  }
224
225  /**
226   * Make a copy of the writable object using serialiation to a buffer
227   * @param dst the object to copy from
228   * @param src the object to copy into, which is destroyed
229   * @throws IOException
230   * @deprecated use ReflectionUtils.cloneInto instead.
231   */
232  @Deprecated
233  public static void cloneInto(Writable dst, Writable src) throws IOException {
234    ReflectionUtils.cloneWritableInto(dst, src);
235  }
236
237  /**
238   * Serializes an integer to a binary stream with zero-compressed encoding.
239   * For -112 <= i <= 127, only one byte is used with the actual value.
240   * For other values of i, the first byte value indicates whether the
241   * integer is positive or negative, and the number of bytes that follow.
242   * If the first byte value v is between -113 and -116, the following integer
243   * is positive, with number of bytes that follow are -(v+112).
244   * If the first byte value v is between -121 and -124, the following integer
245   * is negative, with number of bytes that follow are -(v+120). Bytes are
246   * stored in the high-non-zero-byte-first order.
247   *
248   * @param stream Binary output stream
249   * @param i Integer to be serialized
250   * @throws java.io.IOException 
251   */
252  public static void writeVInt(DataOutput stream, int i) throws IOException {
253    writeVLong(stream, i);
254  }
255  
256  /**
257   * Serializes a long to a binary stream with zero-compressed encoding.
258   * For -112 <= i <= 127, only one byte is used with the actual value.
259   * For other values of i, the first byte value indicates whether the
260   * long is positive or negative, and the number of bytes that follow.
261   * If the first byte value v is between -113 and -120, the following long
262   * is positive, with number of bytes that follow are -(v+112).
263   * If the first byte value v is between -121 and -128, the following long
264   * is negative, with number of bytes that follow are -(v+120). Bytes are
265   * stored in the high-non-zero-byte-first order.
266   * 
267   * @param stream Binary output stream
268   * @param i Long to be serialized
269   * @throws java.io.IOException 
270   */
271  public static void writeVLong(DataOutput stream, long i) throws IOException {
272    if (i >= -112 && i <= 127) {
273      stream.writeByte((byte)i);
274      return;
275    }
276      
277    int len = -112;
278    if (i < 0) {
279      i ^= -1L; // take one's complement'
280      len = -120;
281    }
282      
283    long tmp = i;
284    while (tmp != 0) {
285      tmp = tmp >> 8;
286      len--;
287    }
288      
289    stream.writeByte((byte)len);
290      
291    len = (len < -120) ? -(len + 120) : -(len + 112);
292      
293    for (int idx = len; idx != 0; idx--) {
294      int shiftbits = (idx - 1) * 8;
295      long mask = 0xFFL << shiftbits;
296      stream.writeByte((byte)((i & mask) >> shiftbits));
297    }
298  }
299  
300
301  /**
302   * Reads a zero-compressed encoded long from input stream and returns it.
303   * @param stream Binary input stream
304   * @throws java.io.IOException 
305   * @return deserialized long from stream.
306   */
307  public static long readVLong(DataInput stream) throws IOException {
308    byte firstByte = stream.readByte();
309    int len = decodeVIntSize(firstByte);
310    if (len == 1) {
311      return firstByte;
312    }
313    long i = 0;
314    for (int idx = 0; idx < len-1; idx++) {
315      byte b = stream.readByte();
316      i = i << 8;
317      i = i | (b & 0xFF);
318    }
319    return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
320  }
321
322  /**
323   * Reads a zero-compressed encoded integer from input stream and returns it.
324   * @param stream Binary input stream
325   * @throws java.io.IOException 
326   * @return deserialized integer from stream.
327   */
328  public static int readVInt(DataInput stream) throws IOException {
329    long n = readVLong(stream);
330    if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
331      throw new IOException("value too long to fit in integer");
332    }
333    return (int)n;
334  }
335
336  /**
337   * Reads an integer from the input stream and returns it.
338   *
339   * This function validates that the integer is between [lower, upper],
340   * inclusive.
341   *
342   * @param stream Binary input stream
343   * @throws java.io.IOException
344   * @return deserialized integer from stream
345   */
346  public static int readVIntInRange(DataInput stream, int lower, int upper)
347      throws IOException {
348    long n = readVLong(stream);
349    if (n < lower) {
350      if (lower == 0) {
351        throw new IOException("expected non-negative integer, got " + n);
352      } else {
353        throw new IOException("expected integer greater than or equal to " +
354            lower + ", got " + n);
355      }
356    }
357    if (n > upper) {
358      throw new IOException("expected integer less or equal to " + upper +
359          ", got " + n);
360    }
361    return (int)n;
362  }
363
364  /**
365   * Given the first byte of a vint/vlong, determine the sign
366   * @param value the first byte
367   * @return is the value negative
368   */
369  public static boolean isNegativeVInt(byte value) {
370    return value < -120 || (value >= -112 && value < 0);
371  }
372
373  /**
374   * Parse the first byte of a vint/vlong to determine the number of bytes
375   * @param value the first byte of the vint/vlong
376   * @return the total number of bytes (1 to 9)
377   */
378  public static int decodeVIntSize(byte value) {
379    if (value >= -112) {
380      return 1;
381    } else if (value < -120) {
382      return -119 - value;
383    }
384    return -111 - value;
385  }
386
387  /**
388   * Get the encoded length if an integer is stored in a variable-length format
389   * @return the encoded length 
390   */
391  public static int getVIntSize(long i) {
392    if (i >= -112 && i <= 127) {
393      return 1;
394    }
395      
396    if (i < 0) {
397      i ^= -1L; // take one's complement'
398    }
399    // find the number of bytes with non-leading zeros
400    int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
401    // find the number of data bytes + length byte
402    return (dataBits + 7) / 8 + 1;
403  }
404  /**
405   * Read an Enum value from DataInput, Enums are read and written 
406   * using String values. 
407   * @param <T> Enum type
408   * @param in DataInput to read from 
409   * @param enumType Class type of Enum
410   * @return Enum represented by String read from DataInput
411   * @throws IOException
412   */
413  public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
414    throws IOException{
415    return T.valueOf(enumType, Text.readString(in));
416  }
417  /**
418   * writes String value of enum to DataOutput. 
419   * @param out Dataoutput stream
420   * @param enumVal enum value
421   * @throws IOException
422   */
423  public static void writeEnum(DataOutput out,  Enum<?> enumVal) 
424    throws IOException{
425    Text.writeString(out, enumVal.name()); 
426  }
427  /**
428   * Skip <i>len</i> number of bytes in input stream<i>in</i>
429   * @param in input stream
430   * @param len number of bytes to skip
431   * @throws IOException when skipped less number of bytes
432   */
433  public static void skipFully(DataInput in, int len) throws IOException {
434    int total = 0;
435    int cur = 0;
436
437    while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
438        total += cur;
439    }
440
441    if (total<len) {
442      throw new IOException("Not able to skip " + len + " bytes, possibly " +
443                            "due to end of input.");
444    }
445  }
446
447  /** Convert writables to a byte array */
448  public static byte[] toByteArray(Writable... writables) {
449    final DataOutputBuffer out = new DataOutputBuffer();
450    try {
451      for(Writable w : writables) {
452        w.write(out);
453      }
454      out.close();
455    } catch (IOException e) {
456      throw new RuntimeException("Fail to convert writables to a byte array",e);
457    }
458    return out.getData();
459  }
460
461  /**
462   * Read a string, but check it for sanity. The format consists of a vint
463   * followed by the given number of bytes.
464   * @param in the stream to read from
465   * @param maxLength the largest acceptable length of the encoded string
466   * @return the bytes as a string
467   * @throws IOException if reading from the DataInput fails
468   * @throws IllegalArgumentException if the encoded byte size for string 
469             is negative or larger than maxSize. Only the vint is read.
470   */
471  public static String readStringSafely(DataInput in,
472                                        int maxLength
473                                        ) throws IOException, 
474                                                 IllegalArgumentException {
475    int length = readVInt(in);
476    if (length < 0 || length > maxLength) {
477      throw new IllegalArgumentException("Encoded byte size for String was " + length + 
478                                         ", which is outside of 0.." +
479                                         maxLength + " range.");
480    }
481    byte [] bytes = new byte[length];
482    in.readFully(bytes, 0, length);
483    return Text.decode(bytes);
484  }
485}