001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.lang.reflect.Array;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024
025import java.io.*;
026import java.util.*;
027
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.*;
031import org.apache.hadoop.util.ProtoUtil;
032
033import com.google.protobuf.Message;
034
035/** A polymorphic Writable that writes an instance with it's class name.
036 * Handles arrays, strings and primitive types without a Writable wrapper.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040public class ObjectWritable implements Writable, Configurable {
041
042  private Class declaredClass;
043  private Object instance;
044  private Configuration conf;
045
046  public ObjectWritable() {}
047  
048  public ObjectWritable(Object instance) {
049    set(instance);
050  }
051
052  public ObjectWritable(Class declaredClass, Object instance) {
053    this.declaredClass = declaredClass;
054    this.instance = instance;
055  }
056
057  /** Return the instance, or null if none. */
058  public Object get() { return instance; }
059  
060  /** Return the class this is meant to be. */
061  public Class getDeclaredClass() { return declaredClass; }
062  
063  /** Reset the instance. */
064  public void set(Object instance) {
065    this.declaredClass = instance.getClass();
066    this.instance = instance;
067  }
068  
069  @Override
070  public String toString() {
071    return "OW[class=" + declaredClass + ",value=" + instance + "]";
072  }
073
074  
075  @Override
076  public void readFields(DataInput in) throws IOException {
077    readObject(in, this, this.conf);
078  }
079  
080  @Override
081  public void write(DataOutput out) throws IOException {
082    writeObject(out, instance, declaredClass, conf);
083  }
084
085  private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
086  static {
087    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
088    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
089    PRIMITIVE_NAMES.put("char", Character.TYPE);
090    PRIMITIVE_NAMES.put("short", Short.TYPE);
091    PRIMITIVE_NAMES.put("int", Integer.TYPE);
092    PRIMITIVE_NAMES.put("long", Long.TYPE);
093    PRIMITIVE_NAMES.put("float", Float.TYPE);
094    PRIMITIVE_NAMES.put("double", Double.TYPE);
095    PRIMITIVE_NAMES.put("void", Void.TYPE);
096  }
097
098  private static class NullInstance extends Configured implements Writable {
099    private Class<?> declaredClass;
100    public NullInstance() { super(null); }
101    public NullInstance(Class declaredClass, Configuration conf) {
102      super(conf);
103      this.declaredClass = declaredClass;
104    }
105    @Override
106    public void readFields(DataInput in) throws IOException {
107      String className = UTF8.readString(in);
108      declaredClass = PRIMITIVE_NAMES.get(className);
109      if (declaredClass == null) {
110        try {
111          declaredClass = getConf().getClassByName(className);
112        } catch (ClassNotFoundException e) {
113          throw new RuntimeException(e.toString());
114        }
115      }
116    }
117    @Override
118    public void write(DataOutput out) throws IOException {
119      UTF8.writeString(out, declaredClass.getName());
120    }
121  }
122
123  /** Write a {@link Writable}, {@link String}, primitive type, or an array of
124   * the preceding. */
125  public static void writeObject(DataOutput out, Object instance,
126                                 Class declaredClass, 
127                                 Configuration conf) throws IOException {
128    writeObject(out, instance, declaredClass, conf, false);
129  }
130  
131    /** 
132     * Write a {@link Writable}, {@link String}, primitive type, or an array of
133     * the preceding.  
134     * 
135     * @param allowCompactArrays - set true for RPC and internal or intra-cluster
136     * usages.  Set false for inter-cluster, File, and other persisted output 
137     * usages, to preserve the ability to interchange files with other clusters 
138     * that may not be running the same version of software.  Sometime in ~2013 
139     * we can consider removing this parameter and always using the compact format.
140     */
141    public static void writeObject(DataOutput out, Object instance,
142        Class declaredClass, Configuration conf, boolean allowCompactArrays) 
143    throws IOException {
144
145    if (instance == null) {                       // null
146      instance = new NullInstance(declaredClass, conf);
147      declaredClass = Writable.class;
148    }
149    
150    // Special case: must come before writing out the declaredClass.
151    // If this is an eligible array of primitives,
152    // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
153    if (allowCompactArrays && declaredClass.isArray()
154        && instance.getClass().getName().equals(declaredClass.getName())
155        && instance.getClass().getComponentType().isPrimitive()) {
156      instance = new ArrayPrimitiveWritable.Internal(instance);
157      declaredClass = ArrayPrimitiveWritable.Internal.class;
158    }
159
160    UTF8.writeString(out, declaredClass.getName()); // always write declared
161
162    if (declaredClass.isArray()) {     // non-primitive or non-compact array
163      int length = Array.getLength(instance);
164      out.writeInt(length);
165      for (int i = 0; i < length; i++) {
166        writeObject(out, Array.get(instance, i),
167            declaredClass.getComponentType(), conf, allowCompactArrays);
168      }
169      
170    } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
171      ((ArrayPrimitiveWritable.Internal) instance).write(out);
172      
173    } else if (declaredClass == String.class) {   // String
174      UTF8.writeString(out, (String)instance);
175      
176    } else if (declaredClass.isPrimitive()) {     // primitive type
177
178      if (declaredClass == Boolean.TYPE) {        // boolean
179        out.writeBoolean(((Boolean)instance).booleanValue());
180      } else if (declaredClass == Character.TYPE) { // char
181        out.writeChar(((Character)instance).charValue());
182      } else if (declaredClass == Byte.TYPE) {    // byte
183        out.writeByte(((Byte)instance).byteValue());
184      } else if (declaredClass == Short.TYPE) {   // short
185        out.writeShort(((Short)instance).shortValue());
186      } else if (declaredClass == Integer.TYPE) { // int
187        out.writeInt(((Integer)instance).intValue());
188      } else if (declaredClass == Long.TYPE) {    // long
189        out.writeLong(((Long)instance).longValue());
190      } else if (declaredClass == Float.TYPE) {   // float
191        out.writeFloat(((Float)instance).floatValue());
192      } else if (declaredClass == Double.TYPE) {  // double
193        out.writeDouble(((Double)instance).doubleValue());
194      } else if (declaredClass == Void.TYPE) {    // void
195      } else {
196        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
197      }
198    } else if (declaredClass.isEnum()) {         // enum
199      UTF8.writeString(out, ((Enum)instance).name());
200    } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
201      UTF8.writeString(out, instance.getClass().getName());
202      ((Writable)instance).write(out);
203
204    } else if (Message.class.isAssignableFrom(declaredClass)) {
205      ((Message)instance).writeDelimitedTo(
206          DataOutputOutputStream.constructOutputStream(out));
207    } else {
208      throw new IOException("Can't write: "+instance+" as "+declaredClass);
209    }
210  }
211  
212  
213  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
214   * the preceding. */
215  public static Object readObject(DataInput in, Configuration conf)
216    throws IOException {
217    return readObject(in, null, conf);
218  }
219    
220  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
221   * the preceding. */
222  @SuppressWarnings("unchecked")
223  public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
224    throws IOException {
225    String className = UTF8.readString(in);
226    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
227    if (declaredClass == null) {
228      declaredClass = loadClass(conf, className);
229    }
230    
231    Object instance;
232    
233    if (declaredClass.isPrimitive()) {            // primitive types
234
235      if (declaredClass == Boolean.TYPE) {             // boolean
236        instance = Boolean.valueOf(in.readBoolean());
237      } else if (declaredClass == Character.TYPE) {    // char
238        instance = Character.valueOf(in.readChar());
239      } else if (declaredClass == Byte.TYPE) {         // byte
240        instance = Byte.valueOf(in.readByte());
241      } else if (declaredClass == Short.TYPE) {        // short
242        instance = Short.valueOf(in.readShort());
243      } else if (declaredClass == Integer.TYPE) {      // int
244        instance = Integer.valueOf(in.readInt());
245      } else if (declaredClass == Long.TYPE) {         // long
246        instance = Long.valueOf(in.readLong());
247      } else if (declaredClass == Float.TYPE) {        // float
248        instance = Float.valueOf(in.readFloat());
249      } else if (declaredClass == Double.TYPE) {       // double
250        instance = Double.valueOf(in.readDouble());
251      } else if (declaredClass == Void.TYPE) {         // void
252        instance = null;
253      } else {
254        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
255      }
256
257    } else if (declaredClass.isArray()) {              // array
258      int length = in.readInt();
259      instance = Array.newInstance(declaredClass.getComponentType(), length);
260      for (int i = 0; i < length; i++) {
261        Array.set(instance, i, readObject(in, conf));
262      }
263      
264    } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
265      // Read and unwrap ArrayPrimitiveWritable$Internal array.
266      // Always allow the read, even if write is disabled by allowCompactArrays.
267      ArrayPrimitiveWritable.Internal temp = 
268          new ArrayPrimitiveWritable.Internal();
269      temp.readFields(in);
270      instance = temp.get();
271      declaredClass = instance.getClass();
272
273    } else if (declaredClass == String.class) {        // String
274      instance = UTF8.readString(in);
275    } else if (declaredClass.isEnum()) {         // enum
276      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
277    } else if (Message.class.isAssignableFrom(declaredClass)) {
278      instance = tryInstantiateProtobuf(declaredClass, in);
279    } else {                                      // Writable
280      Class instanceClass = null;
281      String str = UTF8.readString(in);
282      instanceClass = loadClass(conf, str);
283      
284      Writable writable = WritableFactories.newInstance(instanceClass, conf);
285      writable.readFields(in);
286      instance = writable;
287
288      if (instanceClass == NullInstance.class) {  // null
289        declaredClass = ((NullInstance)instance).declaredClass;
290        instance = null;
291      }
292    }
293
294    if (objectWritable != null) {                 // store values
295      objectWritable.declaredClass = declaredClass;
296      objectWritable.instance = instance;
297    }
298
299    return instance;
300      
301  }
302
303  /**
304   * Try to instantiate a protocol buffer of the given message class
305   * from the given input stream.
306   * 
307   * @param protoClass the class of the generated protocol buffer
308   * @param dataIn the input stream to read from
309   * @return the instantiated Message instance
310   * @throws IOException if an IO problem occurs
311   */
312  private static Message tryInstantiateProtobuf(
313      Class<?> protoClass,
314      DataInput dataIn) throws IOException {
315
316    try {
317      if (dataIn instanceof InputStream) {
318        // We can use the built-in parseDelimitedFrom and not have to re-copy
319        // the data
320        Method parseMethod = getStaticProtobufMethod(protoClass,
321            "parseDelimitedFrom", InputStream.class);
322        return (Message)parseMethod.invoke(null, (InputStream)dataIn);
323      } else {
324        // Have to read it into a buffer first, since protobuf doesn't deal
325        // with the DataInput interface directly.
326        
327        // Read the size delimiter that writeDelimitedTo writes
328        int size = ProtoUtil.readRawVarint32(dataIn);
329        if (size < 0) {
330          throw new IOException("Invalid size: " + size);
331        }
332      
333        byte[] data = new byte[size];
334        dataIn.readFully(data);
335        Method parseMethod = getStaticProtobufMethod(protoClass,
336            "parseFrom", byte[].class);
337        return (Message)parseMethod.invoke(null, data);
338      }
339    } catch (InvocationTargetException e) {
340      
341      if (e.getCause() instanceof IOException) {
342        throw (IOException)e.getCause();
343      } else {
344        throw new IOException(e.getCause());
345      }
346    } catch (IllegalAccessException iae) {
347      throw new AssertionError("Could not access parse method in " +
348          protoClass);
349    }
350  }
351
352  static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
353      Class<?> ... args) {
354
355    try {
356      return declaredClass.getMethod(method, args);
357    } catch (Exception e) {
358      // This is a bug in Hadoop - protobufs should all have this static method
359      throw new AssertionError("Protocol buffer class " + declaredClass +
360          " does not have an accessible parseFrom(InputStream) method!");
361    }
362  }
363
364  /**
365   * Find and load the class with given name <tt>className</tt> by first finding
366   * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
367   * try load it directly.
368   */
369  public static Class<?> loadClass(Configuration conf, String className) {
370    Class<?> declaredClass = null;
371    try {
372      if (conf != null)
373        declaredClass = conf.getClassByName(className);
374      else
375        declaredClass = Class.forName(className);
376    } catch (ClassNotFoundException e) {
377      throw new RuntimeException("readObject can't find class " + className,
378          e);
379    }
380    return declaredClass;
381  }
382
383  @Override
384  public void setConf(Configuration conf) {
385    this.conf = conf;
386  }
387
388  @Override
389  public Configuration getConf() {
390    return this.conf;
391  }
392  
393}