001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.serializer;
020
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.OutputStream;
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.conf.Configured;
030 import org.apache.hadoop.io.Writable;
031 import org.apache.hadoop.util.ReflectionUtils;
032
033 /**
034 * A {@link Serialization} for {@link Writable}s that delegates to
035 * {@link Writable#write(java.io.DataOutput)} and
036 * {@link Writable#readFields(java.io.DataInput)}.
037 */
038 @InterfaceAudience.Public
039 @InterfaceStability.Evolving
040 public class WritableSerialization extends Configured
041 implements Serialization<Writable> {
042 static class WritableDeserializer extends Configured
043 implements Deserializer<Writable> {
044
045 private Class<?> writableClass;
046 private DataInputStream dataIn;
047
048 public WritableDeserializer(Configuration conf, Class<?> c) {
049 setConf(conf);
050 this.writableClass = c;
051 }
052
053 @Override
054 public void open(InputStream in) {
055 if (in instanceof DataInputStream) {
056 dataIn = (DataInputStream) in;
057 } else {
058 dataIn = new DataInputStream(in);
059 }
060 }
061
062 @Override
063 public Writable deserialize(Writable w) throws IOException {
064 Writable writable;
065 if (w == null) {
066 writable
067 = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
068 } else {
069 writable = w;
070 }
071 writable.readFields(dataIn);
072 return writable;
073 }
074
075 @Override
076 public void close() throws IOException {
077 dataIn.close();
078 }
079
080 }
081
082 static class WritableSerializer extends Configured implements
083 Serializer<Writable> {
084
085 private DataOutputStream dataOut;
086
087 @Override
088 public void open(OutputStream out) {
089 if (out instanceof DataOutputStream) {
090 dataOut = (DataOutputStream) out;
091 } else {
092 dataOut = new DataOutputStream(out);
093 }
094 }
095
096 @Override
097 public void serialize(Writable w) throws IOException {
098 w.write(dataOut);
099 }
100
101 @Override
102 public void close() throws IOException {
103 dataOut.close();
104 }
105
106 }
107
108 @InterfaceAudience.Private
109 @Override
110 public boolean accept(Class<?> c) {
111 return Writable.class.isAssignableFrom(c);
112 }
113
114 @InterfaceAudience.Private
115 @Override
116 public Serializer<Writable> getSerializer(Class<Writable> c) {
117 return new WritableSerializer();
118 }
119
120 @InterfaceAudience.Private
121 @Override
122 public Deserializer<Writable> getDeserializer(Class<Writable> c) {
123 return new WritableDeserializer(getConf(), c);
124 }
125
126 }