001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io;
020    
021    import java.lang.reflect.Array;
022    import java.lang.reflect.InvocationTargetException;
023    import java.lang.reflect.Method;
024    
025    import java.io.*;
026    import java.util.*;
027    
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.*;
031    import org.apache.hadoop.util.ProtoUtil;
032    
033    import com.google.protobuf.Message;
034    
035    /** A polymorphic Writable that writes an instance with it's class name.
036     * Handles arrays, strings and primitive types without a Writable wrapper.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Stable
040    public class ObjectWritable implements Writable, Configurable {
041    
042      private Class declaredClass;
043      private Object instance;
044      private Configuration conf;
045    
046      public ObjectWritable() {}
047      
048      public ObjectWritable(Object instance) {
049        set(instance);
050      }
051    
052      public ObjectWritable(Class declaredClass, Object instance) {
053        this.declaredClass = declaredClass;
054        this.instance = instance;
055      }
056    
057      /** Return the instance, or null if none. */
058      public Object get() { return instance; }
059      
060      /** Return the class this is meant to be. */
061      public Class getDeclaredClass() { return declaredClass; }
062      
063      /** Reset the instance. */
064      public void set(Object instance) {
065        this.declaredClass = instance.getClass();
066        this.instance = instance;
067      }
068      
069      @Override
070      public String toString() {
071        return "OW[class=" + declaredClass + ",value=" + instance + "]";
072      }
073    
074      
075      @Override
076      public void readFields(DataInput in) throws IOException {
077        readObject(in, this, this.conf);
078      }
079      
080      @Override
081      public void write(DataOutput out) throws IOException {
082        writeObject(out, instance, declaredClass, conf);
083      }
084    
085      private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
086      static {
087        PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
088        PRIMITIVE_NAMES.put("byte", Byte.TYPE);
089        PRIMITIVE_NAMES.put("char", Character.TYPE);
090        PRIMITIVE_NAMES.put("short", Short.TYPE);
091        PRIMITIVE_NAMES.put("int", Integer.TYPE);
092        PRIMITIVE_NAMES.put("long", Long.TYPE);
093        PRIMITIVE_NAMES.put("float", Float.TYPE);
094        PRIMITIVE_NAMES.put("double", Double.TYPE);
095        PRIMITIVE_NAMES.put("void", Void.TYPE);
096      }
097    
098      private static class NullInstance extends Configured implements Writable {
099        private Class<?> declaredClass;
100        public NullInstance() { super(null); }
101        public NullInstance(Class declaredClass, Configuration conf) {
102          super(conf);
103          this.declaredClass = declaredClass;
104        }
105        @Override
106        public void readFields(DataInput in) throws IOException {
107          String className = UTF8.readString(in);
108          declaredClass = PRIMITIVE_NAMES.get(className);
109          if (declaredClass == null) {
110            try {
111              declaredClass = getConf().getClassByName(className);
112            } catch (ClassNotFoundException e) {
113              throw new RuntimeException(e.toString());
114            }
115          }
116        }
117        @Override
118        public void write(DataOutput out) throws IOException {
119          UTF8.writeString(out, declaredClass.getName());
120        }
121      }
122    
123      /** Write a {@link Writable}, {@link String}, primitive type, or an array of
124       * the preceding. */
125      public static void writeObject(DataOutput out, Object instance,
126                                     Class declaredClass, 
127                                     Configuration conf) throws IOException {
128        writeObject(out, instance, declaredClass, conf, false);
129      }
130      
131        /** 
132         * Write a {@link Writable}, {@link String}, primitive type, or an array of
133         * the preceding.  
134         * 
135         * @param allowCompactArrays - set true for RPC and internal or intra-cluster
136         * usages.  Set false for inter-cluster, File, and other persisted output 
137         * usages, to preserve the ability to interchange files with other clusters 
138         * that may not be running the same version of software.  Sometime in ~2013 
139         * we can consider removing this parameter and always using the compact format.
140         */
141        public static void writeObject(DataOutput out, Object instance,
142            Class declaredClass, Configuration conf, boolean allowCompactArrays) 
143        throws IOException {
144    
145        if (instance == null) {                       // null
146          instance = new NullInstance(declaredClass, conf);
147          declaredClass = Writable.class;
148        }
149        
150        // Special case: must come before writing out the declaredClass.
151        // If this is an eligible array of primitives,
152        // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
153        if (allowCompactArrays && declaredClass.isArray()
154            && instance.getClass().getName().equals(declaredClass.getName())
155            && instance.getClass().getComponentType().isPrimitive()) {
156          instance = new ArrayPrimitiveWritable.Internal(instance);
157          declaredClass = ArrayPrimitiveWritable.Internal.class;
158        }
159    
160        UTF8.writeString(out, declaredClass.getName()); // always write declared
161    
162        if (declaredClass.isArray()) {     // non-primitive or non-compact array
163          int length = Array.getLength(instance);
164          out.writeInt(length);
165          for (int i = 0; i < length; i++) {
166            writeObject(out, Array.get(instance, i),
167                declaredClass.getComponentType(), conf, allowCompactArrays);
168          }
169          
170        } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
171          ((ArrayPrimitiveWritable.Internal) instance).write(out);
172          
173        } else if (declaredClass == String.class) {   // String
174          UTF8.writeString(out, (String)instance);
175          
176        } else if (declaredClass.isPrimitive()) {     // primitive type
177    
178          if (declaredClass == Boolean.TYPE) {        // boolean
179            out.writeBoolean(((Boolean)instance).booleanValue());
180          } else if (declaredClass == Character.TYPE) { // char
181            out.writeChar(((Character)instance).charValue());
182          } else if (declaredClass == Byte.TYPE) {    // byte
183            out.writeByte(((Byte)instance).byteValue());
184          } else if (declaredClass == Short.TYPE) {   // short
185            out.writeShort(((Short)instance).shortValue());
186          } else if (declaredClass == Integer.TYPE) { // int
187            out.writeInt(((Integer)instance).intValue());
188          } else if (declaredClass == Long.TYPE) {    // long
189            out.writeLong(((Long)instance).longValue());
190          } else if (declaredClass == Float.TYPE) {   // float
191            out.writeFloat(((Float)instance).floatValue());
192          } else if (declaredClass == Double.TYPE) {  // double
193            out.writeDouble(((Double)instance).doubleValue());
194          } else if (declaredClass == Void.TYPE) {    // void
195          } else {
196            throw new IllegalArgumentException("Not a primitive: "+declaredClass);
197          }
198        } else if (declaredClass.isEnum()) {         // enum
199          UTF8.writeString(out, ((Enum)instance).name());
200        } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
201          UTF8.writeString(out, instance.getClass().getName());
202          ((Writable)instance).write(out);
203    
204        } else if (Message.class.isAssignableFrom(declaredClass)) {
205          ((Message)instance).writeDelimitedTo(
206              DataOutputOutputStream.constructOutputStream(out));
207        } else {
208          throw new IOException("Can't write: "+instance+" as "+declaredClass);
209        }
210      }
211      
212      
213      /** Read a {@link Writable}, {@link String}, primitive type, or an array of
214       * the preceding. */
215      public static Object readObject(DataInput in, Configuration conf)
216        throws IOException {
217        return readObject(in, null, conf);
218      }
219        
220      /** Read a {@link Writable}, {@link String}, primitive type, or an array of
221       * the preceding. */
222      @SuppressWarnings("unchecked")
223      public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
224        throws IOException {
225        String className = UTF8.readString(in);
226        Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
227        if (declaredClass == null) {
228          declaredClass = loadClass(conf, className);
229        }
230        
231        Object instance;
232        
233        if (declaredClass.isPrimitive()) {            // primitive types
234    
235          if (declaredClass == Boolean.TYPE) {             // boolean
236            instance = Boolean.valueOf(in.readBoolean());
237          } else if (declaredClass == Character.TYPE) {    // char
238            instance = Character.valueOf(in.readChar());
239          } else if (declaredClass == Byte.TYPE) {         // byte
240            instance = Byte.valueOf(in.readByte());
241          } else if (declaredClass == Short.TYPE) {        // short
242            instance = Short.valueOf(in.readShort());
243          } else if (declaredClass == Integer.TYPE) {      // int
244            instance = Integer.valueOf(in.readInt());
245          } else if (declaredClass == Long.TYPE) {         // long
246            instance = Long.valueOf(in.readLong());
247          } else if (declaredClass == Float.TYPE) {        // float
248            instance = Float.valueOf(in.readFloat());
249          } else if (declaredClass == Double.TYPE) {       // double
250            instance = Double.valueOf(in.readDouble());
251          } else if (declaredClass == Void.TYPE) {         // void
252            instance = null;
253          } else {
254            throw new IllegalArgumentException("Not a primitive: "+declaredClass);
255          }
256    
257        } else if (declaredClass.isArray()) {              // array
258          int length = in.readInt();
259          instance = Array.newInstance(declaredClass.getComponentType(), length);
260          for (int i = 0; i < length; i++) {
261            Array.set(instance, i, readObject(in, conf));
262          }
263          
264        } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
265          // Read and unwrap ArrayPrimitiveWritable$Internal array.
266          // Always allow the read, even if write is disabled by allowCompactArrays.
267          ArrayPrimitiveWritable.Internal temp = 
268              new ArrayPrimitiveWritable.Internal();
269          temp.readFields(in);
270          instance = temp.get();
271          declaredClass = instance.getClass();
272    
273        } else if (declaredClass == String.class) {        // String
274          instance = UTF8.readString(in);
275        } else if (declaredClass.isEnum()) {         // enum
276          instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
277        } else if (Message.class.isAssignableFrom(declaredClass)) {
278          instance = tryInstantiateProtobuf(declaredClass, in);
279        } else {                                      // Writable
280          Class instanceClass = null;
281          String str = UTF8.readString(in);
282          instanceClass = loadClass(conf, str);
283          
284          Writable writable = WritableFactories.newInstance(instanceClass, conf);
285          writable.readFields(in);
286          instance = writable;
287    
288          if (instanceClass == NullInstance.class) {  // null
289            declaredClass = ((NullInstance)instance).declaredClass;
290            instance = null;
291          }
292        }
293    
294        if (objectWritable != null) {                 // store values
295          objectWritable.declaredClass = declaredClass;
296          objectWritable.instance = instance;
297        }
298    
299        return instance;
300          
301      }
302    
303      /**
304       * Try to instantiate a protocol buffer of the given message class
305       * from the given input stream.
306       * 
307       * @param protoClass the class of the generated protocol buffer
308       * @param dataIn the input stream to read from
309       * @return the instantiated Message instance
310       * @throws IOException if an IO problem occurs
311       */
312      private static Message tryInstantiateProtobuf(
313          Class<?> protoClass,
314          DataInput dataIn) throws IOException {
315    
316        try {
317          if (dataIn instanceof InputStream) {
318            // We can use the built-in parseDelimitedFrom and not have to re-copy
319            // the data
320            Method parseMethod = getStaticProtobufMethod(protoClass,
321                "parseDelimitedFrom", InputStream.class);
322            return (Message)parseMethod.invoke(null, (InputStream)dataIn);
323          } else {
324            // Have to read it into a buffer first, since protobuf doesn't deal
325            // with the DataInput interface directly.
326            
327            // Read the size delimiter that writeDelimitedTo writes
328            int size = ProtoUtil.readRawVarint32(dataIn);
329            if (size < 0) {
330              throw new IOException("Invalid size: " + size);
331            }
332          
333            byte[] data = new byte[size];
334            dataIn.readFully(data);
335            Method parseMethod = getStaticProtobufMethod(protoClass,
336                "parseFrom", byte[].class);
337            return (Message)parseMethod.invoke(null, data);
338          }
339        } catch (InvocationTargetException e) {
340          
341          if (e.getCause() instanceof IOException) {
342            throw (IOException)e.getCause();
343          } else {
344            throw new IOException(e.getCause());
345          }
346        } catch (IllegalAccessException iae) {
347          throw new AssertionError("Could not access parse method in " +
348              protoClass);
349        }
350      }
351    
352      static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
353          Class<?> ... args) {
354    
355        try {
356          return declaredClass.getMethod(method, args);
357        } catch (Exception e) {
358          // This is a bug in Hadoop - protobufs should all have this static method
359          throw new AssertionError("Protocol buffer class " + declaredClass +
360              " does not have an accessible parseFrom(InputStream) method!");
361        }
362      }
363    
364      /**
365       * Find and load the class with given name <tt>className</tt> by first finding
366       * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
367       * try load it directly.
368       */
369      public static Class<?> loadClass(Configuration conf, String className) {
370        Class<?> declaredClass = null;
371        try {
372          if (conf != null)
373            declaredClass = conf.getClassByName(className);
374          else
375            declaredClass = Class.forName(className);
376        } catch (ClassNotFoundException e) {
377          throw new RuntimeException("readObject can't find class " + className,
378              e);
379        }
380        return declaredClass;
381      }
382    
383      @Override
384      public void setConf(Configuration conf) {
385        this.conf = conf;
386      }
387    
388      @Override
389      public Configuration getConf() {
390        return this.conf;
391      }
392      
393    }