001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.serializer;
020    
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.OutputStream;
026    import org.apache.hadoop.classification.InterfaceAudience;
027    import org.apache.hadoop.classification.InterfaceStability;
028    import org.apache.hadoop.conf.Configuration;
029    import org.apache.hadoop.conf.Configured;
030    import org.apache.hadoop.io.Writable;
031    import org.apache.hadoop.util.ReflectionUtils;
032    
033    /**
034     * A {@link Serialization} for {@link Writable}s that delegates to
035     * {@link Writable#write(java.io.DataOutput)} and
036     * {@link Writable#readFields(java.io.DataInput)}.
037     */
038    @InterfaceAudience.Public
039    @InterfaceStability.Evolving
040    public class WritableSerialization extends Configured
041            implements Serialization<Writable> {
042      static class WritableDeserializer extends Configured
043            implements Deserializer<Writable> {
044    
045        private Class<?> writableClass;
046        private DataInputStream dataIn;
047        
048        public WritableDeserializer(Configuration conf, Class<?> c) {
049          setConf(conf);
050          this.writableClass = c;
051        }
052        
053        @Override
054        public void open(InputStream in) {
055          if (in instanceof DataInputStream) {
056            dataIn = (DataInputStream) in;
057          } else {
058            dataIn = new DataInputStream(in);
059          }
060        }
061        
062        @Override
063        public Writable deserialize(Writable w) throws IOException {
064          Writable writable;
065          if (w == null) {
066            writable 
067              = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
068          } else {
069            writable = w;
070          }
071          writable.readFields(dataIn);
072          return writable;
073        }
074    
075        @Override
076        public void close() throws IOException {
077          dataIn.close();
078        }
079        
080      }
081      
082      static class WritableSerializer extends Configured implements
083            Serializer<Writable> {
084        
085        private DataOutputStream dataOut;
086        
087        @Override
088        public void open(OutputStream out) {
089          if (out instanceof DataOutputStream) {
090            dataOut = (DataOutputStream) out;
091          } else {
092            dataOut = new DataOutputStream(out);
093          }
094        }
095    
096        @Override
097        public void serialize(Writable w) throws IOException {
098          w.write(dataOut);
099        }
100    
101        @Override
102        public void close() throws IOException {
103          dataOut.close();
104        }
105    
106      }
107    
108      @InterfaceAudience.Private
109      @Override
110      public boolean accept(Class<?> c) {
111        return Writable.class.isAssignableFrom(c);
112      }
113    
114      @InterfaceAudience.Private
115      @Override
116      public Serializer<Writable> getSerializer(Class<Writable> c) {
117        return new WritableSerializer();
118      }
119      
120      @InterfaceAudience.Private
121      @Override
122      public Deserializer<Writable> getDeserializer(Class<Writable> c) {
123        return new WritableDeserializer(getConf(), c);
124      }
125    
126    }