001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.lang.reflect.Array;
022import java.lang.reflect.InvocationTargetException;
023import java.lang.reflect.Method;
024
025import java.io.*;
026import java.util.*;
027
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.*;
031import org.apache.hadoop.util.ProtoUtil;
032
033import com.google.protobuf.Message;
034
035/** A polymorphic Writable that writes an instance with it's class name.
036 * Handles arrays, strings and primitive types without a Writable wrapper.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Stable
040public class ObjectWritable implements Writable, Configurable {
041
042  private Class declaredClass;
043  private Object instance;
044  private Configuration conf;
045
046  public ObjectWritable() {}
047  
048  public ObjectWritable(Object instance) {
049    set(instance);
050  }
051
052  public ObjectWritable(Class declaredClass, Object instance) {
053    this.declaredClass = declaredClass;
054    this.instance = instance;
055  }
056
057  /** Return the instance, or null if none. */
058  public Object get() { return instance; }
059  
060  /** Return the class this is meant to be. */
061  public Class getDeclaredClass() { return declaredClass; }
062  
063  /** Reset the instance. */
064  public void set(Object instance) {
065    this.declaredClass = instance.getClass();
066    this.instance = instance;
067  }
068  
069  public String toString() {
070    return "OW[class=" + declaredClass + ",value=" + instance + "]";
071  }
072
073  
074  public void readFields(DataInput in) throws IOException {
075    readObject(in, this, this.conf);
076  }
077  
078  public void write(DataOutput out) throws IOException {
079    writeObject(out, instance, declaredClass, conf);
080  }
081
082  private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
083  static {
084    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
085    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
086    PRIMITIVE_NAMES.put("char", Character.TYPE);
087    PRIMITIVE_NAMES.put("short", Short.TYPE);
088    PRIMITIVE_NAMES.put("int", Integer.TYPE);
089    PRIMITIVE_NAMES.put("long", Long.TYPE);
090    PRIMITIVE_NAMES.put("float", Float.TYPE);
091    PRIMITIVE_NAMES.put("double", Double.TYPE);
092    PRIMITIVE_NAMES.put("void", Void.TYPE);
093  }
094
095  private static class NullInstance extends Configured implements Writable {
096    private Class<?> declaredClass;
097    public NullInstance() { super(null); }
098    public NullInstance(Class declaredClass, Configuration conf) {
099      super(conf);
100      this.declaredClass = declaredClass;
101    }
102    public void readFields(DataInput in) throws IOException {
103      String className = UTF8.readString(in);
104      declaredClass = PRIMITIVE_NAMES.get(className);
105      if (declaredClass == null) {
106        try {
107          declaredClass = getConf().getClassByName(className);
108        } catch (ClassNotFoundException e) {
109          throw new RuntimeException(e.toString());
110        }
111      }
112    }
113    public void write(DataOutput out) throws IOException {
114      UTF8.writeString(out, declaredClass.getName());
115    }
116  }
117
118  /** Write a {@link Writable}, {@link String}, primitive type, or an array of
119   * the preceding. */
120  public static void writeObject(DataOutput out, Object instance,
121                                 Class declaredClass, 
122                                 Configuration conf) throws IOException {
123    writeObject(out, instance, declaredClass, conf, false);
124  }
125  
126    /** 
127     * Write a {@link Writable}, {@link String}, primitive type, or an array of
128     * the preceding.  
129     * 
130     * @param allowCompactArrays - set true for RPC and internal or intra-cluster
131     * usages.  Set false for inter-cluster, File, and other persisted output 
132     * usages, to preserve the ability to interchange files with other clusters 
133     * that may not be running the same version of software.  Sometime in ~2013 
134     * we can consider removing this parameter and always using the compact format.
135     */
136    public static void writeObject(DataOutput out, Object instance,
137        Class declaredClass, Configuration conf, boolean allowCompactArrays) 
138    throws IOException {
139
140    if (instance == null) {                       // null
141      instance = new NullInstance(declaredClass, conf);
142      declaredClass = Writable.class;
143    }
144    
145    // Special case: must come before writing out the declaredClass.
146    // If this is an eligible array of primitives,
147    // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
148    if (allowCompactArrays && declaredClass.isArray()
149        && instance.getClass().getName().equals(declaredClass.getName())
150        && instance.getClass().getComponentType().isPrimitive()) {
151      instance = new ArrayPrimitiveWritable.Internal(instance);
152      declaredClass = ArrayPrimitiveWritable.Internal.class;
153    }
154
155    UTF8.writeString(out, declaredClass.getName()); // always write declared
156
157    if (declaredClass.isArray()) {     // non-primitive or non-compact array
158      int length = Array.getLength(instance);
159      out.writeInt(length);
160      for (int i = 0; i < length; i++) {
161        writeObject(out, Array.get(instance, i),
162            declaredClass.getComponentType(), conf, allowCompactArrays);
163      }
164      
165    } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
166      ((ArrayPrimitiveWritable.Internal) instance).write(out);
167      
168    } else if (declaredClass == String.class) {   // String
169      UTF8.writeString(out, (String)instance);
170      
171    } else if (declaredClass.isPrimitive()) {     // primitive type
172
173      if (declaredClass == Boolean.TYPE) {        // boolean
174        out.writeBoolean(((Boolean)instance).booleanValue());
175      } else if (declaredClass == Character.TYPE) { // char
176        out.writeChar(((Character)instance).charValue());
177      } else if (declaredClass == Byte.TYPE) {    // byte
178        out.writeByte(((Byte)instance).byteValue());
179      } else if (declaredClass == Short.TYPE) {   // short
180        out.writeShort(((Short)instance).shortValue());
181      } else if (declaredClass == Integer.TYPE) { // int
182        out.writeInt(((Integer)instance).intValue());
183      } else if (declaredClass == Long.TYPE) {    // long
184        out.writeLong(((Long)instance).longValue());
185      } else if (declaredClass == Float.TYPE) {   // float
186        out.writeFloat(((Float)instance).floatValue());
187      } else if (declaredClass == Double.TYPE) {  // double
188        out.writeDouble(((Double)instance).doubleValue());
189      } else if (declaredClass == Void.TYPE) {    // void
190      } else {
191        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
192      }
193    } else if (declaredClass.isEnum()) {         // enum
194      UTF8.writeString(out, ((Enum)instance).name());
195    } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
196      UTF8.writeString(out, instance.getClass().getName());
197      ((Writable)instance).write(out);
198
199    } else if (Message.class.isAssignableFrom(declaredClass)) {
200      ((Message)instance).writeDelimitedTo(
201          DataOutputOutputStream.constructOutputStream(out));
202    } else {
203      throw new IOException("Can't write: "+instance+" as "+declaredClass);
204    }
205  }
206  
207  
208  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
209   * the preceding. */
210  public static Object readObject(DataInput in, Configuration conf)
211    throws IOException {
212    return readObject(in, null, conf);
213  }
214    
215  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
216   * the preceding. */
217  @SuppressWarnings("unchecked")
218  public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
219    throws IOException {
220    String className = UTF8.readString(in);
221    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
222    if (declaredClass == null) {
223      declaredClass = loadClass(conf, className);
224    }
225    
226    Object instance;
227    
228    if (declaredClass.isPrimitive()) {            // primitive types
229
230      if (declaredClass == Boolean.TYPE) {             // boolean
231        instance = Boolean.valueOf(in.readBoolean());
232      } else if (declaredClass == Character.TYPE) {    // char
233        instance = Character.valueOf(in.readChar());
234      } else if (declaredClass == Byte.TYPE) {         // byte
235        instance = Byte.valueOf(in.readByte());
236      } else if (declaredClass == Short.TYPE) {        // short
237        instance = Short.valueOf(in.readShort());
238      } else if (declaredClass == Integer.TYPE) {      // int
239        instance = Integer.valueOf(in.readInt());
240      } else if (declaredClass == Long.TYPE) {         // long
241        instance = Long.valueOf(in.readLong());
242      } else if (declaredClass == Float.TYPE) {        // float
243        instance = Float.valueOf(in.readFloat());
244      } else if (declaredClass == Double.TYPE) {       // double
245        instance = Double.valueOf(in.readDouble());
246      } else if (declaredClass == Void.TYPE) {         // void
247        instance = null;
248      } else {
249        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
250      }
251
252    } else if (declaredClass.isArray()) {              // array
253      int length = in.readInt();
254      instance = Array.newInstance(declaredClass.getComponentType(), length);
255      for (int i = 0; i < length; i++) {
256        Array.set(instance, i, readObject(in, conf));
257      }
258      
259    } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
260      // Read and unwrap ArrayPrimitiveWritable$Internal array.
261      // Always allow the read, even if write is disabled by allowCompactArrays.
262      ArrayPrimitiveWritable.Internal temp = 
263          new ArrayPrimitiveWritable.Internal();
264      temp.readFields(in);
265      instance = temp.get();
266      declaredClass = instance.getClass();
267
268    } else if (declaredClass == String.class) {        // String
269      instance = UTF8.readString(in);
270    } else if (declaredClass.isEnum()) {         // enum
271      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
272    } else if (Message.class.isAssignableFrom(declaredClass)) {
273      instance = tryInstantiateProtobuf(declaredClass, in);
274    } else {                                      // Writable
275      Class instanceClass = null;
276      String str = UTF8.readString(in);
277      instanceClass = loadClass(conf, str);
278      
279      Writable writable = WritableFactories.newInstance(instanceClass, conf);
280      writable.readFields(in);
281      instance = writable;
282
283      if (instanceClass == NullInstance.class) {  // null
284        declaredClass = ((NullInstance)instance).declaredClass;
285        instance = null;
286      }
287    }
288
289    if (objectWritable != null) {                 // store values
290      objectWritable.declaredClass = declaredClass;
291      objectWritable.instance = instance;
292    }
293
294    return instance;
295      
296  }
297
298  /**
299   * Try to instantiate a protocol buffer of the given message class
300   * from the given input stream.
301   * 
302   * @param protoClass the class of the generated protocol buffer
303   * @param dataIn the input stream to read from
304   * @return the instantiated Message instance
305   * @throws IOException if an IO problem occurs
306   */
307  private static Message tryInstantiateProtobuf(
308      Class<?> protoClass,
309      DataInput dataIn) throws IOException {
310
311    try {
312      if (dataIn instanceof InputStream) {
313        // We can use the built-in parseDelimitedFrom and not have to re-copy
314        // the data
315        Method parseMethod = getStaticProtobufMethod(protoClass,
316            "parseDelimitedFrom", InputStream.class);
317        return (Message)parseMethod.invoke(null, (InputStream)dataIn);
318      } else {
319        // Have to read it into a buffer first, since protobuf doesn't deal
320        // with the DataInput interface directly.
321        
322        // Read the size delimiter that writeDelimitedTo writes
323        int size = ProtoUtil.readRawVarint32(dataIn);
324        if (size < 0) {
325          throw new IOException("Invalid size: " + size);
326        }
327      
328        byte[] data = new byte[size];
329        dataIn.readFully(data);
330        Method parseMethod = getStaticProtobufMethod(protoClass,
331            "parseFrom", byte[].class);
332        return (Message)parseMethod.invoke(null, data);
333      }
334    } catch (InvocationTargetException e) {
335      
336      if (e.getCause() instanceof IOException) {
337        throw (IOException)e.getCause();
338      } else {
339        throw new IOException(e.getCause());
340      }
341    } catch (IllegalAccessException iae) {
342      throw new AssertionError("Could not access parse method in " +
343          protoClass);
344    }
345  }
346
347  static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
348      Class<?> ... args) {
349
350    try {
351      return declaredClass.getMethod(method, args);
352    } catch (Exception e) {
353      // This is a bug in Hadoop - protobufs should all have this static method
354      throw new AssertionError("Protocol buffer class " + declaredClass +
355          " does not have an accessible parseFrom(InputStream) method!");
356    }
357  }
358
359  /**
360   * Find and load the class with given name <tt>className</tt> by first finding
361   * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
362   * try load it directly.
363   */
364  public static Class<?> loadClass(Configuration conf, String className) {
365    Class<?> declaredClass = null;
366    try {
367      if (conf != null)
368        declaredClass = conf.getClassByName(className);
369      else
370        declaredClass = Class.forName(className);
371    } catch (ClassNotFoundException e) {
372      throw new RuntimeException("readObject can't find class " + className,
373          e);
374    }
375    return declaredClass;
376  }
377
378  public void setConf(Configuration conf) {
379    this.conf = conf;
380  }
381
382  public Configuration getConf() {
383    return this.conf;
384  }
385  
386}