001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.serializer;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.conf.Configured;
030import org.apache.hadoop.io.Writable;
031import org.apache.hadoop.util.ReflectionUtils;
032
033/**
034 * A {@link Serialization} for {@link Writable}s that delegates to
035 * {@link Writable#write(java.io.DataOutput)} and
036 * {@link Writable#readFields(java.io.DataInput)}.
037 */
038@InterfaceAudience.Public
039@InterfaceStability.Evolving
040public class WritableSerialization extends Configured
041        implements Serialization<Writable> {
042  static class WritableDeserializer extends Configured
043        implements Deserializer<Writable> {
044
045    private Class<?> writableClass;
046    private DataInputStream dataIn;
047    
048    public WritableDeserializer(Configuration conf, Class<?> c) {
049      setConf(conf);
050      this.writableClass = c;
051    }
052    
053    @Override
054    public void open(InputStream in) {
055      if (in instanceof DataInputStream) {
056        dataIn = (DataInputStream) in;
057      } else {
058        dataIn = new DataInputStream(in);
059      }
060    }
061    
062    @Override
063    public Writable deserialize(Writable w) throws IOException {
064      Writable writable;
065      if (w == null) {
066        writable 
067          = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
068      } else {
069        writable = w;
070      }
071      writable.readFields(dataIn);
072      return writable;
073    }
074
075    @Override
076    public void close() throws IOException {
077      dataIn.close();
078    }
079    
080  }
081  
082  static class WritableSerializer extends Configured implements
083        Serializer<Writable> {
084    
085    private DataOutputStream dataOut;
086    
087    @Override
088    public void open(OutputStream out) {
089      if (out instanceof DataOutputStream) {
090        dataOut = (DataOutputStream) out;
091      } else {
092        dataOut = new DataOutputStream(out);
093      }
094    }
095
096    @Override
097    public void serialize(Writable w) throws IOException {
098      w.write(dataOut);
099    }
100
101    @Override
102    public void close() throws IOException {
103      dataOut.close();
104    }
105
106  }
107
108  @InterfaceAudience.Private
109  @Override
110  public boolean accept(Class<?> c) {
111    return Writable.class.isAssignableFrom(c);
112  }
113
114  @InterfaceAudience.Private
115  @Override
116  public Serializer<Writable> getSerializer(Class<Writable> c) {
117    return new WritableSerializer();
118  }
119  
120  @InterfaceAudience.Private
121  @Override
122  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
123    return new WritableDeserializer(getConf(), c);
124  }
125
126}