001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.io.*;
022    
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.util.ReflectionUtils;
027    
028    import java.util.zip.GZIPInputStream;
029    import java.util.zip.GZIPOutputStream;
030    
031    @InterfaceAudience.Public
032    @InterfaceStability.Stable
033    public final class WritableUtils  {
034    
035      public static byte[] readCompressedByteArray(DataInput in) throws IOException {
036        int length = in.readInt();
037        if (length == -1) return null;
038        byte[] buffer = new byte[length];
039        in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
040        GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
041        byte[] outbuf = new byte[length];
042        ByteArrayOutputStream bos =  new ByteArrayOutputStream();
043        int len;
044        while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
045          bos.write(outbuf, 0, len);
046        }
047        byte[] decompressed =  bos.toByteArray();
048        bos.close();
049        gzi.close();
050        return decompressed;
051      }
052    
053      public static void skipCompressedByteArray(DataInput in) throws IOException {
054        int length = in.readInt();
055        if (length != -1) {
056          skipFully(in, length);
057        }
058      }
059    
060      public static int  writeCompressedByteArray(DataOutput out, 
061                                                  byte[] bytes) throws IOException {
062        if (bytes != null) {
063          ByteArrayOutputStream bos =  new ByteArrayOutputStream();
064          GZIPOutputStream gzout = new GZIPOutputStream(bos);
065          try {
066            gzout.write(bytes, 0, bytes.length);
067            gzout.close();
068            gzout = null;
069          } finally {
070            IOUtils.closeStream(gzout);
071          }
072          byte[] buffer = bos.toByteArray();
073          int len = buffer.length;
074          out.writeInt(len);
075          out.write(buffer, 0, len);
076          /* debug only! Once we have confidence, can lose this. */
077          return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
078        } else {
079          out.writeInt(-1);
080          return -1;
081        }
082      }
083    
084    
085      /* Ugly utility, maybe someone else can do this better  */
086      public static String readCompressedString(DataInput in) throws IOException {
087        byte[] bytes = readCompressedByteArray(in);
088        if (bytes == null) return null;
089        return new String(bytes, "UTF-8");
090      }
091    
092    
093      public static int  writeCompressedString(DataOutput out, String s) throws IOException {
094        return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
095      }
096    
097      /*
098       *
099       * Write a String as a Network Int n, followed by n Bytes
100       * Alternative to 16 bit read/writeUTF.
101       * Encoding standard is... ?
102       * 
103       */
104      public static void writeString(DataOutput out, String s) throws IOException {
105        if (s != null) {
106          byte[] buffer = s.getBytes("UTF-8");
107          int len = buffer.length;
108          out.writeInt(len);
109          out.write(buffer, 0, len);
110        } else {
111          out.writeInt(-1);
112        }
113      }
114    
115      /*
116       * Read a String as a Network Int n, followed by n Bytes
117       * Alternative to 16 bit read/writeUTF.
118       * Encoding standard is... ?
119       *
120       */
121      public static String readString(DataInput in) throws IOException{
122        int length = in.readInt();
123        if (length == -1) return null;
124        byte[] buffer = new byte[length];
125        in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
126        return new String(buffer,"UTF-8");  
127      }
128    
129    
130      /*
131       * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
132       * Could be generalised using introspection.
133       *
134       */
135      public static void writeStringArray(DataOutput out, String[] s) throws IOException{
136        out.writeInt(s.length);
137        for(int i = 0; i < s.length; i++) {
138          writeString(out, s[i]);
139        }
140      }
141    
142      /*
143       * Write a String array as a Nework Int N, followed by Int N Byte Array of
144       * compressed Strings. Handles also null arrays and null values.
145       * Could be generalised using introspection.
146       *
147       */
148      public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
149        if (s == null) {
150          out.writeInt(-1);
151          return;
152        }
153        out.writeInt(s.length);
154        for(int i = 0; i < s.length; i++) {
155          writeCompressedString(out, s[i]);
156        }
157      }
158    
159      /*
160       * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
161       * Could be generalised using introspection. Actually this bit couldn't...
162       *
163       */
164      public static String[] readStringArray(DataInput in) throws IOException {
165        int len = in.readInt();
166        if (len == -1) return null;
167        String[] s = new String[len];
168        for(int i = 0; i < len; i++) {
169          s[i] = readString(in);
170        }
171        return s;
172      }
173    
174    
175      /*
176       * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
177       * Could be generalised using introspection. Handles null arrays and null values.
178       *
179       */
180      public static  String[] readCompressedStringArray(DataInput in) throws IOException {
181        int len = in.readInt();
182        if (len == -1) return null;
183        String[] s = new String[len];
184        for(int i = 0; i < len; i++) {
185          s[i] = readCompressedString(in);
186        }
187        return s;
188      }
189    
190    
191      /*
192       *
193       * Test Utility Method Display Byte Array. 
194       *
195       */
196      public static void displayByteArray(byte[] record){
197        int i;
198        for(i=0;i < record.length -1; i++){
199          if (i % 16 == 0) { System.out.println(); }
200          System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
201          System.out.print(Integer.toHexString(record[i] & 0x0F));
202          System.out.print(",");
203        }
204        System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
205        System.out.print(Integer.toHexString(record[i] & 0x0F));
206        System.out.println();
207      }
208    
209      /**
210       * Make a copy of a writable object using serialization to a buffer.
211       * @param orig The object to copy
212       * @return The copied object
213       */
214      public static <T extends Writable> T clone(T orig, Configuration conf) {
215        try {
216          @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
217          T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
218          ReflectionUtils.copy(conf, orig, newInst);
219          return newInst;
220        } catch (IOException e) {
221          throw new RuntimeException("Error writing/reading clone buffer", e);
222        }
223      }
224    
225      /**
226       * Make a copy of the writable object using serialiation to a buffer
227       * @param dst the object to copy from
228       * @param src the object to copy into, which is destroyed
229       * @throws IOException
230       * @deprecated use ReflectionUtils.cloneInto instead.
231       */
232      @Deprecated
233      public static void cloneInto(Writable dst, Writable src) throws IOException {
234        ReflectionUtils.cloneWritableInto(dst, src);
235      }
236    
237      /**
238       * Serializes an integer to a binary stream with zero-compressed encoding.
239       * For -120 <= i <= 127, only one byte is used with the actual value.
240       * For other values of i, the first byte value indicates whether the
241       * integer is positive or negative, and the number of bytes that follow.
242       * If the first byte value v is between -121 and -124, the following integer
243       * is positive, with number of bytes that follow are -(v+120).
244       * If the first byte value v is between -125 and -128, the following integer
245       * is negative, with number of bytes that follow are -(v+124). Bytes are
246       * stored in the high-non-zero-byte-first order.
247       *
248       * @param stream Binary output stream
249       * @param i Integer to be serialized
250       * @throws java.io.IOException 
251       */
252      public static void writeVInt(DataOutput stream, int i) throws IOException {
253        writeVLong(stream, i);
254      }
255      
256      /**
257       * Serializes a long to a binary stream with zero-compressed encoding.
258       * For -112 <= i <= 127, only one byte is used with the actual value.
259       * For other values of i, the first byte value indicates whether the
260       * long is positive or negative, and the number of bytes that follow.
261       * If the first byte value v is between -113 and -120, the following long
262       * is positive, with number of bytes that follow are -(v+112).
263       * If the first byte value v is between -121 and -128, the following long
264       * is negative, with number of bytes that follow are -(v+120). Bytes are
265       * stored in the high-non-zero-byte-first order.
266       * 
267       * @param stream Binary output stream
268       * @param i Long to be serialized
269       * @throws java.io.IOException 
270       */
271      public static void writeVLong(DataOutput stream, long i) throws IOException {
272        if (i >= -112 && i <= 127) {
273          stream.writeByte((byte)i);
274          return;
275        }
276          
277        int len = -112;
278        if (i < 0) {
279          i ^= -1L; // take one's complement'
280          len = -120;
281        }
282          
283        long tmp = i;
284        while (tmp != 0) {
285          tmp = tmp >> 8;
286          len--;
287        }
288          
289        stream.writeByte((byte)len);
290          
291        len = (len < -120) ? -(len + 120) : -(len + 112);
292          
293        for (int idx = len; idx != 0; idx--) {
294          int shiftbits = (idx - 1) * 8;
295          long mask = 0xFFL << shiftbits;
296          stream.writeByte((byte)((i & mask) >> shiftbits));
297        }
298      }
299      
300    
301      /**
302       * Reads a zero-compressed encoded long from input stream and returns it.
303       * @param stream Binary input stream
304       * @throws java.io.IOException 
305       * @return deserialized long from stream.
306       */
307      public static long readVLong(DataInput stream) throws IOException {
308        byte firstByte = stream.readByte();
309        int len = decodeVIntSize(firstByte);
310        if (len == 1) {
311          return firstByte;
312        }
313        long i = 0;
314        for (int idx = 0; idx < len-1; idx++) {
315          byte b = stream.readByte();
316          i = i << 8;
317          i = i | (b & 0xFF);
318        }
319        return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
320      }
321    
322      /**
323       * Reads a zero-compressed encoded integer from input stream and returns it.
324       * @param stream Binary input stream
325       * @throws java.io.IOException 
326       * @return deserialized integer from stream.
327       */
328      public static int readVInt(DataInput stream) throws IOException {
329        long n = readVLong(stream);
330        if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
331          throw new IOException("value too long to fit in integer");
332        }
333        return (int)n;
334      }
335    
336      /**
337       * Reads an integer from the input stream and returns it.
338       *
339       * This function validates that the integer is between [lower, upper],
340       * inclusive.
341       *
342       * @param stream Binary input stream
343       * @throws java.io.IOException
344       * @return deserialized integer from stream
345       */
346      public static int readVIntInRange(DataInput stream, int lower, int upper)
347          throws IOException {
348        long n = readVLong(stream);
349        if (n < lower) {
350          if (lower == 0) {
351            throw new IOException("expected non-negative integer, got " + n);
352          } else {
353            throw new IOException("expected integer greater than or equal to " +
354                lower + ", got " + n);
355          }
356        }
357        if (n > upper) {
358          throw new IOException("expected integer less or equal to " + upper +
359              ", got " + n);
360        }
361        return (int)n;
362      }
363    
364      /**
365       * Given the first byte of a vint/vlong, determine the sign
366       * @param value the first byte
367       * @return is the value negative
368       */
369      public static boolean isNegativeVInt(byte value) {
370        return value < -120 || (value >= -112 && value < 0);
371      }
372    
373      /**
374       * Parse the first byte of a vint/vlong to determine the number of bytes
375       * @param value the first byte of the vint/vlong
376       * @return the total number of bytes (1 to 9)
377       */
378      public static int decodeVIntSize(byte value) {
379        if (value >= -112) {
380          return 1;
381        } else if (value < -120) {
382          return -119 - value;
383        }
384        return -111 - value;
385      }
386    
387      /**
388       * Get the encoded length if an integer is stored in a variable-length format
389       * @return the encoded length 
390       */
391      public static int getVIntSize(long i) {
392        if (i >= -112 && i <= 127) {
393          return 1;
394        }
395          
396        if (i < 0) {
397          i ^= -1L; // take one's complement'
398        }
399        // find the number of bytes with non-leading zeros
400        int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
401        // find the number of data bytes + length byte
402        return (dataBits + 7) / 8 + 1;
403      }
404      /**
405       * Read an Enum value from DataInput, Enums are read and written 
406       * using String values. 
407       * @param <T> Enum type
408       * @param in DataInput to read from 
409       * @param enumType Class type of Enum
410       * @return Enum represented by String read from DataInput
411       * @throws IOException
412       */
413      public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
414        throws IOException{
415        return T.valueOf(enumType, Text.readString(in));
416      }
417      /**
418       * writes String value of enum to DataOutput. 
419       * @param out Dataoutput stream
420       * @param enumVal enum value
421       * @throws IOException
422       */
423      public static void writeEnum(DataOutput out,  Enum<?> enumVal) 
424        throws IOException{
425        Text.writeString(out, enumVal.name()); 
426      }
427      /**
428       * Skip <i>len</i> number of bytes in input stream<i>in</i>
429       * @param in input stream
430       * @param len number of bytes to skip
431       * @throws IOException when skipped less number of bytes
432       */
433      public static void skipFully(DataInput in, int len) throws IOException {
434        int total = 0;
435        int cur = 0;
436    
437        while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
438            total += cur;
439        }
440    
441        if (total<len) {
442          throw new IOException("Not able to skip " + len + " bytes, possibly " +
443                                "due to end of input.");
444        }
445      }
446    
447      /** Convert writables to a byte array */
448      public static byte[] toByteArray(Writable... writables) {
449        final DataOutputBuffer out = new DataOutputBuffer();
450        try {
451          for(Writable w : writables) {
452            w.write(out);
453          }
454          out.close();
455        } catch (IOException e) {
456          throw new RuntimeException("Fail to convert writables to a byte array",e);
457        }
458        return out.getData();
459      }
460    
461      /**
462       * Read a string, but check it for sanity. The format consists of a vint
463       * followed by the given number of bytes.
464       * @param in the stream to read from
465       * @param maxLength the largest acceptable length of the encoded string
466       * @return the bytes as a string
467       * @throws IOException if reading from the DataInput fails
468       * @throws IllegalArgumentException if the encoded byte size for string 
469                 is negative or larger than maxSize. Only the vint is read.
470       */
471      public static String readStringSafely(DataInput in,
472                                            int maxLength
473                                            ) throws IOException, 
474                                                     IllegalArgumentException {
475        int length = readVInt(in);
476        if (length < 0 || length > maxLength) {
477          throw new IllegalArgumentException("Encoded byte size for String was " + length + 
478                                             ", which is outside of 0.." +
479                                             maxLength + " range.");
480        }
481        byte [] bytes = new byte[length];
482        in.readFully(bytes, 0, length);
483        return Text.decode(bytes);
484      }
485    }