001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.serializer.avro;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.avro.Schema;
026import org.apache.avro.io.BinaryDecoder;
027import org.apache.avro.io.BinaryEncoder;
028import org.apache.avro.io.DatumReader;
029import org.apache.avro.io.DatumWriter;
030import org.apache.avro.io.DecoderFactory;
031import org.apache.avro.io.EncoderFactory;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.conf.Configured;
035import org.apache.hadoop.io.serializer.Deserializer;
036import org.apache.hadoop.io.serializer.Serialization;
037import org.apache.hadoop.io.serializer.Serializer;
038
039/**
040 * Base class for providing serialization to Avro types.
041 */
042@InterfaceAudience.Public
043@InterfaceStability.Evolving
044public abstract class AvroSerialization<T> extends Configured 
045        implements Serialization<T>{
046  
047  @InterfaceAudience.Private
048  public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
049
050  @InterfaceAudience.Private
051  public Deserializer<T> getDeserializer(Class<T> c) {
052    return new AvroDeserializer(c);
053  }
054
055  @InterfaceAudience.Private
056  public Serializer<T> getSerializer(Class<T> c) {
057    return new AvroSerializer(c);
058  }
059
060  /**
061   * Return an Avro Schema instance for the given class.
062   */
063  @InterfaceAudience.Private
064  public abstract Schema getSchema(T t);
065
066  /**
067   * Create and return Avro DatumWriter for the given class.
068   */
069  @InterfaceAudience.Private
070  public abstract DatumWriter<T> getWriter(Class<T> clazz);
071
072  /**
073   * Create and return Avro DatumReader for the given class.
074   */
075  @InterfaceAudience.Private
076  public abstract DatumReader<T> getReader(Class<T> clazz);
077
078  class AvroSerializer implements Serializer<T> {
079
080    private DatumWriter<T> writer;
081    private BinaryEncoder encoder;
082    private OutputStream outStream;
083
084    AvroSerializer(Class<T> clazz) {
085      this.writer = getWriter(clazz);
086    }
087
088    @Override
089    public void close() throws IOException {
090      encoder.flush();
091      outStream.close();
092    }
093
094    @Override
095    public void open(OutputStream out) throws IOException {
096      outStream = out;
097      encoder = EncoderFactory.get().binaryEncoder(out, encoder);
098    }
099
100    @Override
101    public void serialize(T t) throws IOException {
102      writer.setSchema(getSchema(t));
103      writer.write(t, encoder);
104    }
105
106  }
107
108  class AvroDeserializer implements Deserializer<T> {
109
110    private DatumReader<T> reader;
111    private BinaryDecoder decoder;
112    private InputStream inStream;
113
114    AvroDeserializer(Class<T> clazz) {
115      this.reader = getReader(clazz);
116    }
117
118    @Override
119    public void close() throws IOException {
120      inStream.close();
121    }
122
123    @Override
124    public T deserialize(T t) throws IOException {
125      return reader.read(t, decoder);
126    }
127
128    @Override
129    public void open(InputStream in) throws IOException {
130      inStream = in;
131      decoder = DecoderFactory.get().binaryDecoder(in, decoder);
132    }
133
134  }
135
136}