001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.lang.reflect.Array; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024 025import java.io.*; 026import java.util.*; 027 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.*; 031import org.apache.hadoop.util.ProtoUtil; 032 033import com.google.protobuf.Message; 034 035/** A polymorphic Writable that writes an instance with it's class name. 036 * Handles arrays, strings and primitive types without a Writable wrapper. 037 */ 038@InterfaceAudience.Public 039@InterfaceStability.Stable 040public class ObjectWritable implements Writable, Configurable { 041 042 private Class declaredClass; 043 private Object instance; 044 private Configuration conf; 045 046 public ObjectWritable() {} 047 048 public ObjectWritable(Object instance) { 049 set(instance); 050 } 051 052 public ObjectWritable(Class declaredClass, Object instance) { 053 this.declaredClass = declaredClass; 054 this.instance = instance; 055 } 056 057 /** Return the instance, or null if none. */ 058 public Object get() { return instance; } 059 060 /** Return the class this is meant to be. */ 061 public Class getDeclaredClass() { return declaredClass; } 062 063 /** Reset the instance. */ 064 public void set(Object instance) { 065 this.declaredClass = instance.getClass(); 066 this.instance = instance; 067 } 068 069 public String toString() { 070 return "OW[class=" + declaredClass + ",value=" + instance + "]"; 071 } 072 073 074 public void readFields(DataInput in) throws IOException { 075 readObject(in, this, this.conf); 076 } 077 078 public void write(DataOutput out) throws IOException { 079 writeObject(out, instance, declaredClass, conf); 080 } 081 082 private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>(); 083 static { 084 PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); 085 PRIMITIVE_NAMES.put("byte", Byte.TYPE); 086 PRIMITIVE_NAMES.put("char", Character.TYPE); 087 PRIMITIVE_NAMES.put("short", Short.TYPE); 088 PRIMITIVE_NAMES.put("int", Integer.TYPE); 089 PRIMITIVE_NAMES.put("long", Long.TYPE); 090 PRIMITIVE_NAMES.put("float", Float.TYPE); 091 PRIMITIVE_NAMES.put("double", Double.TYPE); 092 PRIMITIVE_NAMES.put("void", Void.TYPE); 093 } 094 095 private static class NullInstance extends Configured implements Writable { 096 private Class<?> declaredClass; 097 public NullInstance() { super(null); } 098 public NullInstance(Class declaredClass, Configuration conf) { 099 super(conf); 100 this.declaredClass = declaredClass; 101 } 102 public void readFields(DataInput in) throws IOException { 103 String className = UTF8.readString(in); 104 declaredClass = PRIMITIVE_NAMES.get(className); 105 if (declaredClass == null) { 106 try { 107 declaredClass = getConf().getClassByName(className); 108 } catch (ClassNotFoundException e) { 109 throw new RuntimeException(e.toString()); 110 } 111 } 112 } 113 public void write(DataOutput out) throws IOException { 114 UTF8.writeString(out, declaredClass.getName()); 115 } 116 } 117 118 /** Write a {@link Writable}, {@link String}, primitive type, or an array of 119 * the preceding. */ 120 public static void writeObject(DataOutput out, Object instance, 121 Class declaredClass, 122 Configuration conf) throws IOException { 123 writeObject(out, instance, declaredClass, conf, false); 124 } 125 126 /** 127 * Write a {@link Writable}, {@link String}, primitive type, or an array of 128 * the preceding. 129 * 130 * @param allowCompactArrays - set true for RPC and internal or intra-cluster 131 * usages. Set false for inter-cluster, File, and other persisted output 132 * usages, to preserve the ability to interchange files with other clusters 133 * that may not be running the same version of software. Sometime in ~2013 134 * we can consider removing this parameter and always using the compact format. 135 */ 136 public static void writeObject(DataOutput out, Object instance, 137 Class declaredClass, Configuration conf, boolean allowCompactArrays) 138 throws IOException { 139 140 if (instance == null) { // null 141 instance = new NullInstance(declaredClass, conf); 142 declaredClass = Writable.class; 143 } 144 145 // Special case: must come before writing out the declaredClass. 146 // If this is an eligible array of primitives, 147 // wrap it in an ArrayPrimitiveWritable$Internal wrapper class. 148 if (allowCompactArrays && declaredClass.isArray() 149 && instance.getClass().getName().equals(declaredClass.getName()) 150 && instance.getClass().getComponentType().isPrimitive()) { 151 instance = new ArrayPrimitiveWritable.Internal(instance); 152 declaredClass = ArrayPrimitiveWritable.Internal.class; 153 } 154 155 UTF8.writeString(out, declaredClass.getName()); // always write declared 156 157 if (declaredClass.isArray()) { // non-primitive or non-compact array 158 int length = Array.getLength(instance); 159 out.writeInt(length); 160 for (int i = 0; i < length; i++) { 161 writeObject(out, Array.get(instance, i), 162 declaredClass.getComponentType(), conf, allowCompactArrays); 163 } 164 165 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { 166 ((ArrayPrimitiveWritable.Internal) instance).write(out); 167 168 } else if (declaredClass == String.class) { // String 169 UTF8.writeString(out, (String)instance); 170 171 } else if (declaredClass.isPrimitive()) { // primitive type 172 173 if (declaredClass == Boolean.TYPE) { // boolean 174 out.writeBoolean(((Boolean)instance).booleanValue()); 175 } else if (declaredClass == Character.TYPE) { // char 176 out.writeChar(((Character)instance).charValue()); 177 } else if (declaredClass == Byte.TYPE) { // byte 178 out.writeByte(((Byte)instance).byteValue()); 179 } else if (declaredClass == Short.TYPE) { // short 180 out.writeShort(((Short)instance).shortValue()); 181 } else if (declaredClass == Integer.TYPE) { // int 182 out.writeInt(((Integer)instance).intValue()); 183 } else if (declaredClass == Long.TYPE) { // long 184 out.writeLong(((Long)instance).longValue()); 185 } else if (declaredClass == Float.TYPE) { // float 186 out.writeFloat(((Float)instance).floatValue()); 187 } else if (declaredClass == Double.TYPE) { // double 188 out.writeDouble(((Double)instance).doubleValue()); 189 } else if (declaredClass == Void.TYPE) { // void 190 } else { 191 throw new IllegalArgumentException("Not a primitive: "+declaredClass); 192 } 193 } else if (declaredClass.isEnum()) { // enum 194 UTF8.writeString(out, ((Enum)instance).name()); 195 } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable 196 UTF8.writeString(out, instance.getClass().getName()); 197 ((Writable)instance).write(out); 198 199 } else if (Message.class.isAssignableFrom(declaredClass)) { 200 ((Message)instance).writeDelimitedTo( 201 DataOutputOutputStream.constructOutputStream(out)); 202 } else { 203 throw new IOException("Can't write: "+instance+" as "+declaredClass); 204 } 205 } 206 207 208 /** Read a {@link Writable}, {@link String}, primitive type, or an array of 209 * the preceding. */ 210 public static Object readObject(DataInput in, Configuration conf) 211 throws IOException { 212 return readObject(in, null, conf); 213 } 214 215 /** Read a {@link Writable}, {@link String}, primitive type, or an array of 216 * the preceding. */ 217 @SuppressWarnings("unchecked") 218 public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf) 219 throws IOException { 220 String className = UTF8.readString(in); 221 Class<?> declaredClass = PRIMITIVE_NAMES.get(className); 222 if (declaredClass == null) { 223 declaredClass = loadClass(conf, className); 224 } 225 226 Object instance; 227 228 if (declaredClass.isPrimitive()) { // primitive types 229 230 if (declaredClass == Boolean.TYPE) { // boolean 231 instance = Boolean.valueOf(in.readBoolean()); 232 } else if (declaredClass == Character.TYPE) { // char 233 instance = Character.valueOf(in.readChar()); 234 } else if (declaredClass == Byte.TYPE) { // byte 235 instance = Byte.valueOf(in.readByte()); 236 } else if (declaredClass == Short.TYPE) { // short 237 instance = Short.valueOf(in.readShort()); 238 } else if (declaredClass == Integer.TYPE) { // int 239 instance = Integer.valueOf(in.readInt()); 240 } else if (declaredClass == Long.TYPE) { // long 241 instance = Long.valueOf(in.readLong()); 242 } else if (declaredClass == Float.TYPE) { // float 243 instance = Float.valueOf(in.readFloat()); 244 } else if (declaredClass == Double.TYPE) { // double 245 instance = Double.valueOf(in.readDouble()); 246 } else if (declaredClass == Void.TYPE) { // void 247 instance = null; 248 } else { 249 throw new IllegalArgumentException("Not a primitive: "+declaredClass); 250 } 251 252 } else if (declaredClass.isArray()) { // array 253 int length = in.readInt(); 254 instance = Array.newInstance(declaredClass.getComponentType(), length); 255 for (int i = 0; i < length; i++) { 256 Array.set(instance, i, readObject(in, conf)); 257 } 258 259 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { 260 // Read and unwrap ArrayPrimitiveWritable$Internal array. 261 // Always allow the read, even if write is disabled by allowCompactArrays. 262 ArrayPrimitiveWritable.Internal temp = 263 new ArrayPrimitiveWritable.Internal(); 264 temp.readFields(in); 265 instance = temp.get(); 266 declaredClass = instance.getClass(); 267 268 } else if (declaredClass == String.class) { // String 269 instance = UTF8.readString(in); 270 } else if (declaredClass.isEnum()) { // enum 271 instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); 272 } else if (Message.class.isAssignableFrom(declaredClass)) { 273 instance = tryInstantiateProtobuf(declaredClass, in); 274 } else { // Writable 275 Class instanceClass = null; 276 String str = UTF8.readString(in); 277 instanceClass = loadClass(conf, str); 278 279 Writable writable = WritableFactories.newInstance(instanceClass, conf); 280 writable.readFields(in); 281 instance = writable; 282 283 if (instanceClass == NullInstance.class) { // null 284 declaredClass = ((NullInstance)instance).declaredClass; 285 instance = null; 286 } 287 } 288 289 if (objectWritable != null) { // store values 290 objectWritable.declaredClass = declaredClass; 291 objectWritable.instance = instance; 292 } 293 294 return instance; 295 296 } 297 298 /** 299 * Try to instantiate a protocol buffer of the given message class 300 * from the given input stream. 301 * 302 * @param protoClass the class of the generated protocol buffer 303 * @param dataIn the input stream to read from 304 * @return the instantiated Message instance 305 * @throws IOException if an IO problem occurs 306 */ 307 private static Message tryInstantiateProtobuf( 308 Class<?> protoClass, 309 DataInput dataIn) throws IOException { 310 311 try { 312 if (dataIn instanceof InputStream) { 313 // We can use the built-in parseDelimitedFrom and not have to re-copy 314 // the data 315 Method parseMethod = getStaticProtobufMethod(protoClass, 316 "parseDelimitedFrom", InputStream.class); 317 return (Message)parseMethod.invoke(null, (InputStream)dataIn); 318 } else { 319 // Have to read it into a buffer first, since protobuf doesn't deal 320 // with the DataInput interface directly. 321 322 // Read the size delimiter that writeDelimitedTo writes 323 int size = ProtoUtil.readRawVarint32(dataIn); 324 if (size < 0) { 325 throw new IOException("Invalid size: " + size); 326 } 327 328 byte[] data = new byte[size]; 329 dataIn.readFully(data); 330 Method parseMethod = getStaticProtobufMethod(protoClass, 331 "parseFrom", byte[].class); 332 return (Message)parseMethod.invoke(null, data); 333 } 334 } catch (InvocationTargetException e) { 335 336 if (e.getCause() instanceof IOException) { 337 throw (IOException)e.getCause(); 338 } else { 339 throw new IOException(e.getCause()); 340 } 341 } catch (IllegalAccessException iae) { 342 throw new AssertionError("Could not access parse method in " + 343 protoClass); 344 } 345 } 346 347 static Method getStaticProtobufMethod(Class<?> declaredClass, String method, 348 Class<?> ... args) { 349 350 try { 351 return declaredClass.getMethod(method, args); 352 } catch (Exception e) { 353 // This is a bug in Hadoop - protobufs should all have this static method 354 throw new AssertionError("Protocol buffer class " + declaredClass + 355 " does not have an accessible parseFrom(InputStream) method!"); 356 } 357 } 358 359 /** 360 * Find and load the class with given name <tt>className</tt> by first finding 361 * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null, 362 * try load it directly. 363 */ 364 public static Class<?> loadClass(Configuration conf, String className) { 365 Class<?> declaredClass = null; 366 try { 367 if (conf != null) 368 declaredClass = conf.getClassByName(className); 369 else 370 declaredClass = Class.forName(className); 371 } catch (ClassNotFoundException e) { 372 throw new RuntimeException("readObject can't find class " + className, 373 e); 374 } 375 return declaredClass; 376 } 377 378 public void setConf(Configuration conf) { 379 this.conf = conf; 380 } 381 382 public Configuration getConf() { 383 return this.conf; 384 } 385 386}