001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.lang.reflect.Array; 022import java.lang.reflect.InvocationTargetException; 023import java.lang.reflect.Method; 024 025import java.io.*; 026import java.util.*; 027 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.*; 031import org.apache.hadoop.util.ProtoUtil; 032 033import com.google.protobuf.Message; 034 035/** A polymorphic Writable that writes an instance with it's class name. 036 * Handles arrays, strings and primitive types without a Writable wrapper. 037 */ 038@InterfaceAudience.Public 039@InterfaceStability.Stable 040public class ObjectWritable implements Writable, Configurable { 041 042 private Class declaredClass; 043 private Object instance; 044 private Configuration conf; 045 046 public ObjectWritable() {} 047 048 public ObjectWritable(Object instance) { 049 set(instance); 050 } 051 052 public ObjectWritable(Class declaredClass, Object instance) { 053 this.declaredClass = declaredClass; 054 this.instance = instance; 055 } 056 057 /** Return the instance, or null if none. */ 058 public Object get() { return instance; } 059 060 /** Return the class this is meant to be. */ 061 public Class getDeclaredClass() { return declaredClass; } 062 063 /** Reset the instance. */ 064 public void set(Object instance) { 065 this.declaredClass = instance.getClass(); 066 this.instance = instance; 067 } 068 069 @Override 070 public String toString() { 071 return "OW[class=" + declaredClass + ",value=" + instance + "]"; 072 } 073 074 075 @Override 076 public void readFields(DataInput in) throws IOException { 077 readObject(in, this, this.conf); 078 } 079 080 @Override 081 public void write(DataOutput out) throws IOException { 082 writeObject(out, instance, declaredClass, conf); 083 } 084 085 private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>(); 086 static { 087 PRIMITIVE_NAMES.put("boolean", Boolean.TYPE); 088 PRIMITIVE_NAMES.put("byte", Byte.TYPE); 089 PRIMITIVE_NAMES.put("char", Character.TYPE); 090 PRIMITIVE_NAMES.put("short", Short.TYPE); 091 PRIMITIVE_NAMES.put("int", Integer.TYPE); 092 PRIMITIVE_NAMES.put("long", Long.TYPE); 093 PRIMITIVE_NAMES.put("float", Float.TYPE); 094 PRIMITIVE_NAMES.put("double", Double.TYPE); 095 PRIMITIVE_NAMES.put("void", Void.TYPE); 096 } 097 098 private static class NullInstance extends Configured implements Writable { 099 private Class<?> declaredClass; 100 public NullInstance() { super(null); } 101 public NullInstance(Class declaredClass, Configuration conf) { 102 super(conf); 103 this.declaredClass = declaredClass; 104 } 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 String className = UTF8.readString(in); 108 declaredClass = PRIMITIVE_NAMES.get(className); 109 if (declaredClass == null) { 110 try { 111 declaredClass = getConf().getClassByName(className); 112 } catch (ClassNotFoundException e) { 113 throw new RuntimeException(e.toString()); 114 } 115 } 116 } 117 @Override 118 public void write(DataOutput out) throws IOException { 119 UTF8.writeString(out, declaredClass.getName()); 120 } 121 } 122 123 /** Write a {@link Writable}, {@link String}, primitive type, or an array of 124 * the preceding. */ 125 public static void writeObject(DataOutput out, Object instance, 126 Class declaredClass, 127 Configuration conf) throws IOException { 128 writeObject(out, instance, declaredClass, conf, false); 129 } 130 131 /** 132 * Write a {@link Writable}, {@link String}, primitive type, or an array of 133 * the preceding. 134 * 135 * @param allowCompactArrays - set true for RPC and internal or intra-cluster 136 * usages. Set false for inter-cluster, File, and other persisted output 137 * usages, to preserve the ability to interchange files with other clusters 138 * that may not be running the same version of software. Sometime in ~2013 139 * we can consider removing this parameter and always using the compact format. 140 */ 141 public static void writeObject(DataOutput out, Object instance, 142 Class declaredClass, Configuration conf, boolean allowCompactArrays) 143 throws IOException { 144 145 if (instance == null) { // null 146 instance = new NullInstance(declaredClass, conf); 147 declaredClass = Writable.class; 148 } 149 150 // Special case: must come before writing out the declaredClass. 151 // If this is an eligible array of primitives, 152 // wrap it in an ArrayPrimitiveWritable$Internal wrapper class. 153 if (allowCompactArrays && declaredClass.isArray() 154 && instance.getClass().getName().equals(declaredClass.getName()) 155 && instance.getClass().getComponentType().isPrimitive()) { 156 instance = new ArrayPrimitiveWritable.Internal(instance); 157 declaredClass = ArrayPrimitiveWritable.Internal.class; 158 } 159 160 UTF8.writeString(out, declaredClass.getName()); // always write declared 161 162 if (declaredClass.isArray()) { // non-primitive or non-compact array 163 int length = Array.getLength(instance); 164 out.writeInt(length); 165 for (int i = 0; i < length; i++) { 166 writeObject(out, Array.get(instance, i), 167 declaredClass.getComponentType(), conf, allowCompactArrays); 168 } 169 170 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { 171 ((ArrayPrimitiveWritable.Internal) instance).write(out); 172 173 } else if (declaredClass == String.class) { // String 174 UTF8.writeString(out, (String)instance); 175 176 } else if (declaredClass.isPrimitive()) { // primitive type 177 178 if (declaredClass == Boolean.TYPE) { // boolean 179 out.writeBoolean(((Boolean)instance).booleanValue()); 180 } else if (declaredClass == Character.TYPE) { // char 181 out.writeChar(((Character)instance).charValue()); 182 } else if (declaredClass == Byte.TYPE) { // byte 183 out.writeByte(((Byte)instance).byteValue()); 184 } else if (declaredClass == Short.TYPE) { // short 185 out.writeShort(((Short)instance).shortValue()); 186 } else if (declaredClass == Integer.TYPE) { // int 187 out.writeInt(((Integer)instance).intValue()); 188 } else if (declaredClass == Long.TYPE) { // long 189 out.writeLong(((Long)instance).longValue()); 190 } else if (declaredClass == Float.TYPE) { // float 191 out.writeFloat(((Float)instance).floatValue()); 192 } else if (declaredClass == Double.TYPE) { // double 193 out.writeDouble(((Double)instance).doubleValue()); 194 } else if (declaredClass == Void.TYPE) { // void 195 } else { 196 throw new IllegalArgumentException("Not a primitive: "+declaredClass); 197 } 198 } else if (declaredClass.isEnum()) { // enum 199 UTF8.writeString(out, ((Enum)instance).name()); 200 } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable 201 UTF8.writeString(out, instance.getClass().getName()); 202 ((Writable)instance).write(out); 203 204 } else if (Message.class.isAssignableFrom(declaredClass)) { 205 ((Message)instance).writeDelimitedTo( 206 DataOutputOutputStream.constructOutputStream(out)); 207 } else { 208 throw new IOException("Can't write: "+instance+" as "+declaredClass); 209 } 210 } 211 212 213 /** Read a {@link Writable}, {@link String}, primitive type, or an array of 214 * the preceding. */ 215 public static Object readObject(DataInput in, Configuration conf) 216 throws IOException { 217 return readObject(in, null, conf); 218 } 219 220 /** Read a {@link Writable}, {@link String}, primitive type, or an array of 221 * the preceding. */ 222 @SuppressWarnings("unchecked") 223 public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf) 224 throws IOException { 225 String className = UTF8.readString(in); 226 Class<?> declaredClass = PRIMITIVE_NAMES.get(className); 227 if (declaredClass == null) { 228 declaredClass = loadClass(conf, className); 229 } 230 231 Object instance; 232 233 if (declaredClass.isPrimitive()) { // primitive types 234 235 if (declaredClass == Boolean.TYPE) { // boolean 236 instance = Boolean.valueOf(in.readBoolean()); 237 } else if (declaredClass == Character.TYPE) { // char 238 instance = Character.valueOf(in.readChar()); 239 } else if (declaredClass == Byte.TYPE) { // byte 240 instance = Byte.valueOf(in.readByte()); 241 } else if (declaredClass == Short.TYPE) { // short 242 instance = Short.valueOf(in.readShort()); 243 } else if (declaredClass == Integer.TYPE) { // int 244 instance = Integer.valueOf(in.readInt()); 245 } else if (declaredClass == Long.TYPE) { // long 246 instance = Long.valueOf(in.readLong()); 247 } else if (declaredClass == Float.TYPE) { // float 248 instance = Float.valueOf(in.readFloat()); 249 } else if (declaredClass == Double.TYPE) { // double 250 instance = Double.valueOf(in.readDouble()); 251 } else if (declaredClass == Void.TYPE) { // void 252 instance = null; 253 } else { 254 throw new IllegalArgumentException("Not a primitive: "+declaredClass); 255 } 256 257 } else if (declaredClass.isArray()) { // array 258 int length = in.readInt(); 259 instance = Array.newInstance(declaredClass.getComponentType(), length); 260 for (int i = 0; i < length; i++) { 261 Array.set(instance, i, readObject(in, conf)); 262 } 263 264 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) { 265 // Read and unwrap ArrayPrimitiveWritable$Internal array. 266 // Always allow the read, even if write is disabled by allowCompactArrays. 267 ArrayPrimitiveWritable.Internal temp = 268 new ArrayPrimitiveWritable.Internal(); 269 temp.readFields(in); 270 instance = temp.get(); 271 declaredClass = instance.getClass(); 272 273 } else if (declaredClass == String.class) { // String 274 instance = UTF8.readString(in); 275 } else if (declaredClass.isEnum()) { // enum 276 instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); 277 } else if (Message.class.isAssignableFrom(declaredClass)) { 278 instance = tryInstantiateProtobuf(declaredClass, in); 279 } else { // Writable 280 Class instanceClass = null; 281 String str = UTF8.readString(in); 282 instanceClass = loadClass(conf, str); 283 284 Writable writable = WritableFactories.newInstance(instanceClass, conf); 285 writable.readFields(in); 286 instance = writable; 287 288 if (instanceClass == NullInstance.class) { // null 289 declaredClass = ((NullInstance)instance).declaredClass; 290 instance = null; 291 } 292 } 293 294 if (objectWritable != null) { // store values 295 objectWritable.declaredClass = declaredClass; 296 objectWritable.instance = instance; 297 } 298 299 return instance; 300 301 } 302 303 /** 304 * Try to instantiate a protocol buffer of the given message class 305 * from the given input stream. 306 * 307 * @param protoClass the class of the generated protocol buffer 308 * @param dataIn the input stream to read from 309 * @return the instantiated Message instance 310 * @throws IOException if an IO problem occurs 311 */ 312 private static Message tryInstantiateProtobuf( 313 Class<?> protoClass, 314 DataInput dataIn) throws IOException { 315 316 try { 317 if (dataIn instanceof InputStream) { 318 // We can use the built-in parseDelimitedFrom and not have to re-copy 319 // the data 320 Method parseMethod = getStaticProtobufMethod(protoClass, 321 "parseDelimitedFrom", InputStream.class); 322 return (Message)parseMethod.invoke(null, (InputStream)dataIn); 323 } else { 324 // Have to read it into a buffer first, since protobuf doesn't deal 325 // with the DataInput interface directly. 326 327 // Read the size delimiter that writeDelimitedTo writes 328 int size = ProtoUtil.readRawVarint32(dataIn); 329 if (size < 0) { 330 throw new IOException("Invalid size: " + size); 331 } 332 333 byte[] data = new byte[size]; 334 dataIn.readFully(data); 335 Method parseMethod = getStaticProtobufMethod(protoClass, 336 "parseFrom", byte[].class); 337 return (Message)parseMethod.invoke(null, data); 338 } 339 } catch (InvocationTargetException e) { 340 341 if (e.getCause() instanceof IOException) { 342 throw (IOException)e.getCause(); 343 } else { 344 throw new IOException(e.getCause()); 345 } 346 } catch (IllegalAccessException iae) { 347 throw new AssertionError("Could not access parse method in " + 348 protoClass); 349 } 350 } 351 352 static Method getStaticProtobufMethod(Class<?> declaredClass, String method, 353 Class<?> ... args) { 354 355 try { 356 return declaredClass.getMethod(method, args); 357 } catch (Exception e) { 358 // This is a bug in Hadoop - protobufs should all have this static method 359 throw new AssertionError("Protocol buffer class " + declaredClass + 360 " does not have an accessible parseFrom(InputStream) method!"); 361 } 362 } 363 364 /** 365 * Find and load the class with given name <tt>className</tt> by first finding 366 * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null, 367 * try load it directly. 368 */ 369 public static Class<?> loadClass(Configuration conf, String className) { 370 Class<?> declaredClass = null; 371 try { 372 if (conf != null) 373 declaredClass = conf.getClassByName(className); 374 else 375 declaredClass = Class.forName(className); 376 } catch (ClassNotFoundException e) { 377 throw new RuntimeException("readObject can't find class " + className, 378 e); 379 } 380 return declaredClass; 381 } 382 383 @Override 384 public void setConf(Configuration conf) { 385 this.conf = conf; 386 } 387 388 @Override 389 public Configuration getConf() { 390 return this.conf; 391 } 392 393}