001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io;
020
021 import java.lang.reflect.Array;
022 import java.lang.reflect.InvocationTargetException;
023 import java.lang.reflect.Method;
024
025 import java.io.*;
026 import java.util.*;
027
028 import org.apache.hadoop.classification.InterfaceAudience;
029 import org.apache.hadoop.classification.InterfaceStability;
030 import org.apache.hadoop.conf.*;
031 import org.apache.hadoop.util.ProtoUtil;
032
033 import com.google.protobuf.Message;
034
035 /** A polymorphic Writable that writes an instance with it's class name.
036 * Handles arrays, strings and primitive types without a Writable wrapper.
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class ObjectWritable implements Writable, Configurable {
041
042 private Class declaredClass;
043 private Object instance;
044 private Configuration conf;
045
046 public ObjectWritable() {}
047
048 public ObjectWritable(Object instance) {
049 set(instance);
050 }
051
052 public ObjectWritable(Class declaredClass, Object instance) {
053 this.declaredClass = declaredClass;
054 this.instance = instance;
055 }
056
057 /** Return the instance, or null if none. */
058 public Object get() { return instance; }
059
060 /** Return the class this is meant to be. */
061 public Class getDeclaredClass() { return declaredClass; }
062
063 /** Reset the instance. */
064 public void set(Object instance) {
065 this.declaredClass = instance.getClass();
066 this.instance = instance;
067 }
068
069 @Override
070 public String toString() {
071 return "OW[class=" + declaredClass + ",value=" + instance + "]";
072 }
073
074
075 @Override
076 public void readFields(DataInput in) throws IOException {
077 readObject(in, this, this.conf);
078 }
079
080 @Override
081 public void write(DataOutput out) throws IOException {
082 writeObject(out, instance, declaredClass, conf);
083 }
084
085 private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
086 static {
087 PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
088 PRIMITIVE_NAMES.put("byte", Byte.TYPE);
089 PRIMITIVE_NAMES.put("char", Character.TYPE);
090 PRIMITIVE_NAMES.put("short", Short.TYPE);
091 PRIMITIVE_NAMES.put("int", Integer.TYPE);
092 PRIMITIVE_NAMES.put("long", Long.TYPE);
093 PRIMITIVE_NAMES.put("float", Float.TYPE);
094 PRIMITIVE_NAMES.put("double", Double.TYPE);
095 PRIMITIVE_NAMES.put("void", Void.TYPE);
096 }
097
098 private static class NullInstance extends Configured implements Writable {
099 private Class<?> declaredClass;
100 public NullInstance() { super(null); }
101 public NullInstance(Class declaredClass, Configuration conf) {
102 super(conf);
103 this.declaredClass = declaredClass;
104 }
105 @Override
106 public void readFields(DataInput in) throws IOException {
107 String className = UTF8.readString(in);
108 declaredClass = PRIMITIVE_NAMES.get(className);
109 if (declaredClass == null) {
110 try {
111 declaredClass = getConf().getClassByName(className);
112 } catch (ClassNotFoundException e) {
113 throw new RuntimeException(e.toString());
114 }
115 }
116 }
117 @Override
118 public void write(DataOutput out) throws IOException {
119 UTF8.writeString(out, declaredClass.getName());
120 }
121 }
122
123 /** Write a {@link Writable}, {@link String}, primitive type, or an array of
124 * the preceding. */
125 public static void writeObject(DataOutput out, Object instance,
126 Class declaredClass,
127 Configuration conf) throws IOException {
128 writeObject(out, instance, declaredClass, conf, false);
129 }
130
131 /**
132 * Write a {@link Writable}, {@link String}, primitive type, or an array of
133 * the preceding.
134 *
135 * @param allowCompactArrays - set true for RPC and internal or intra-cluster
136 * usages. Set false for inter-cluster, File, and other persisted output
137 * usages, to preserve the ability to interchange files with other clusters
138 * that may not be running the same version of software. Sometime in ~2013
139 * we can consider removing this parameter and always using the compact format.
140 */
141 public static void writeObject(DataOutput out, Object instance,
142 Class declaredClass, Configuration conf, boolean allowCompactArrays)
143 throws IOException {
144
145 if (instance == null) { // null
146 instance = new NullInstance(declaredClass, conf);
147 declaredClass = Writable.class;
148 }
149
150 // Special case: must come before writing out the declaredClass.
151 // If this is an eligible array of primitives,
152 // wrap it in an ArrayPrimitiveWritable$Internal wrapper class.
153 if (allowCompactArrays && declaredClass.isArray()
154 && instance.getClass().getName().equals(declaredClass.getName())
155 && instance.getClass().getComponentType().isPrimitive()) {
156 instance = new ArrayPrimitiveWritable.Internal(instance);
157 declaredClass = ArrayPrimitiveWritable.Internal.class;
158 }
159
160 UTF8.writeString(out, declaredClass.getName()); // always write declared
161
162 if (declaredClass.isArray()) { // non-primitive or non-compact array
163 int length = Array.getLength(instance);
164 out.writeInt(length);
165 for (int i = 0; i < length; i++) {
166 writeObject(out, Array.get(instance, i),
167 declaredClass.getComponentType(), conf, allowCompactArrays);
168 }
169
170 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
171 ((ArrayPrimitiveWritable.Internal) instance).write(out);
172
173 } else if (declaredClass == String.class) { // String
174 UTF8.writeString(out, (String)instance);
175
176 } else if (declaredClass.isPrimitive()) { // primitive type
177
178 if (declaredClass == Boolean.TYPE) { // boolean
179 out.writeBoolean(((Boolean)instance).booleanValue());
180 } else if (declaredClass == Character.TYPE) { // char
181 out.writeChar(((Character)instance).charValue());
182 } else if (declaredClass == Byte.TYPE) { // byte
183 out.writeByte(((Byte)instance).byteValue());
184 } else if (declaredClass == Short.TYPE) { // short
185 out.writeShort(((Short)instance).shortValue());
186 } else if (declaredClass == Integer.TYPE) { // int
187 out.writeInt(((Integer)instance).intValue());
188 } else if (declaredClass == Long.TYPE) { // long
189 out.writeLong(((Long)instance).longValue());
190 } else if (declaredClass == Float.TYPE) { // float
191 out.writeFloat(((Float)instance).floatValue());
192 } else if (declaredClass == Double.TYPE) { // double
193 out.writeDouble(((Double)instance).doubleValue());
194 } else if (declaredClass == Void.TYPE) { // void
195 } else {
196 throw new IllegalArgumentException("Not a primitive: "+declaredClass);
197 }
198 } else if (declaredClass.isEnum()) { // enum
199 UTF8.writeString(out, ((Enum)instance).name());
200 } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
201 UTF8.writeString(out, instance.getClass().getName());
202 ((Writable)instance).write(out);
203
204 } else if (Message.class.isAssignableFrom(declaredClass)) {
205 ((Message)instance).writeDelimitedTo(
206 DataOutputOutputStream.constructOutputStream(out));
207 } else {
208 throw new IOException("Can't write: "+instance+" as "+declaredClass);
209 }
210 }
211
212
213 /** Read a {@link Writable}, {@link String}, primitive type, or an array of
214 * the preceding. */
215 public static Object readObject(DataInput in, Configuration conf)
216 throws IOException {
217 return readObject(in, null, conf);
218 }
219
220 /** Read a {@link Writable}, {@link String}, primitive type, or an array of
221 * the preceding. */
222 @SuppressWarnings("unchecked")
223 public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)
224 throws IOException {
225 String className = UTF8.readString(in);
226 Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
227 if (declaredClass == null) {
228 declaredClass = loadClass(conf, className);
229 }
230
231 Object instance;
232
233 if (declaredClass.isPrimitive()) { // primitive types
234
235 if (declaredClass == Boolean.TYPE) { // boolean
236 instance = Boolean.valueOf(in.readBoolean());
237 } else if (declaredClass == Character.TYPE) { // char
238 instance = Character.valueOf(in.readChar());
239 } else if (declaredClass == Byte.TYPE) { // byte
240 instance = Byte.valueOf(in.readByte());
241 } else if (declaredClass == Short.TYPE) { // short
242 instance = Short.valueOf(in.readShort());
243 } else if (declaredClass == Integer.TYPE) { // int
244 instance = Integer.valueOf(in.readInt());
245 } else if (declaredClass == Long.TYPE) { // long
246 instance = Long.valueOf(in.readLong());
247 } else if (declaredClass == Float.TYPE) { // float
248 instance = Float.valueOf(in.readFloat());
249 } else if (declaredClass == Double.TYPE) { // double
250 instance = Double.valueOf(in.readDouble());
251 } else if (declaredClass == Void.TYPE) { // void
252 instance = null;
253 } else {
254 throw new IllegalArgumentException("Not a primitive: "+declaredClass);
255 }
256
257 } else if (declaredClass.isArray()) { // array
258 int length = in.readInt();
259 instance = Array.newInstance(declaredClass.getComponentType(), length);
260 for (int i = 0; i < length; i++) {
261 Array.set(instance, i, readObject(in, conf));
262 }
263
264 } else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
265 // Read and unwrap ArrayPrimitiveWritable$Internal array.
266 // Always allow the read, even if write is disabled by allowCompactArrays.
267 ArrayPrimitiveWritable.Internal temp =
268 new ArrayPrimitiveWritable.Internal();
269 temp.readFields(in);
270 instance = temp.get();
271 declaredClass = instance.getClass();
272
273 } else if (declaredClass == String.class) { // String
274 instance = UTF8.readString(in);
275 } else if (declaredClass.isEnum()) { // enum
276 instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));
277 } else if (Message.class.isAssignableFrom(declaredClass)) {
278 instance = tryInstantiateProtobuf(declaredClass, in);
279 } else { // Writable
280 Class instanceClass = null;
281 String str = UTF8.readString(in);
282 instanceClass = loadClass(conf, str);
283
284 Writable writable = WritableFactories.newInstance(instanceClass, conf);
285 writable.readFields(in);
286 instance = writable;
287
288 if (instanceClass == NullInstance.class) { // null
289 declaredClass = ((NullInstance)instance).declaredClass;
290 instance = null;
291 }
292 }
293
294 if (objectWritable != null) { // store values
295 objectWritable.declaredClass = declaredClass;
296 objectWritable.instance = instance;
297 }
298
299 return instance;
300
301 }
302
303 /**
304 * Try to instantiate a protocol buffer of the given message class
305 * from the given input stream.
306 *
307 * @param protoClass the class of the generated protocol buffer
308 * @param dataIn the input stream to read from
309 * @return the instantiated Message instance
310 * @throws IOException if an IO problem occurs
311 */
312 private static Message tryInstantiateProtobuf(
313 Class<?> protoClass,
314 DataInput dataIn) throws IOException {
315
316 try {
317 if (dataIn instanceof InputStream) {
318 // We can use the built-in parseDelimitedFrom and not have to re-copy
319 // the data
320 Method parseMethod = getStaticProtobufMethod(protoClass,
321 "parseDelimitedFrom", InputStream.class);
322 return (Message)parseMethod.invoke(null, (InputStream)dataIn);
323 } else {
324 // Have to read it into a buffer first, since protobuf doesn't deal
325 // with the DataInput interface directly.
326
327 // Read the size delimiter that writeDelimitedTo writes
328 int size = ProtoUtil.readRawVarint32(dataIn);
329 if (size < 0) {
330 throw new IOException("Invalid size: " + size);
331 }
332
333 byte[] data = new byte[size];
334 dataIn.readFully(data);
335 Method parseMethod = getStaticProtobufMethod(protoClass,
336 "parseFrom", byte[].class);
337 return (Message)parseMethod.invoke(null, data);
338 }
339 } catch (InvocationTargetException e) {
340
341 if (e.getCause() instanceof IOException) {
342 throw (IOException)e.getCause();
343 } else {
344 throw new IOException(e.getCause());
345 }
346 } catch (IllegalAccessException iae) {
347 throw new AssertionError("Could not access parse method in " +
348 protoClass);
349 }
350 }
351
352 static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
353 Class<?> ... args) {
354
355 try {
356 return declaredClass.getMethod(method, args);
357 } catch (Exception e) {
358 // This is a bug in Hadoop - protobufs should all have this static method
359 throw new AssertionError("Protocol buffer class " + declaredClass +
360 " does not have an accessible parseFrom(InputStream) method!");
361 }
362 }
363
364 /**
365 * Find and load the class with given name <tt>className</tt> by first finding
366 * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null,
367 * try load it directly.
368 */
369 public static Class<?> loadClass(Configuration conf, String className) {
370 Class<?> declaredClass = null;
371 try {
372 if (conf != null)
373 declaredClass = conf.getClassByName(className);
374 else
375 declaredClass = Class.forName(className);
376 } catch (ClassNotFoundException e) {
377 throw new RuntimeException("readObject can't find class " + className,
378 e);
379 }
380 return declaredClass;
381 }
382
383 @Override
384 public void setConf(Configuration conf) {
385 this.conf = conf;
386 }
387
388 @Override
389 public Configuration getConf() {
390 return this.conf;
391 }
392
393 }