001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.serializer.avro;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.avro.Schema;
026import org.apache.avro.io.BinaryDecoder;
027import org.apache.avro.io.BinaryEncoder;
028import org.apache.avro.io.DatumReader;
029import org.apache.avro.io.DatumWriter;
030import org.apache.avro.io.DecoderFactory;
031import org.apache.avro.io.EncoderFactory;
032import org.apache.hadoop.classification.InterfaceAudience;
033import org.apache.hadoop.classification.InterfaceStability;
034import org.apache.hadoop.conf.Configured;
035import org.apache.hadoop.io.serializer.Deserializer;
036import org.apache.hadoop.io.serializer.Serialization;
037import org.apache.hadoop.io.serializer.Serializer;
038
039/**
040 * Base class for providing serialization to Avro types.
041 */
042@InterfaceAudience.Public
043@InterfaceStability.Evolving
044public abstract class AvroSerialization<T> extends Configured 
045        implements Serialization<T>{
046  
047  @InterfaceAudience.Private
048  public static final String AVRO_SCHEMA_KEY = "Avro-Schema";
049
050  @Override
051  @InterfaceAudience.Private
052  public Deserializer<T> getDeserializer(Class<T> c) {
053    return new AvroDeserializer(c);
054  }
055
056  @Override
057  @InterfaceAudience.Private
058  public Serializer<T> getSerializer(Class<T> c) {
059    return new AvroSerializer(c);
060  }
061
062  /**
063   * Return an Avro Schema instance for the given class.
064   */
065  @InterfaceAudience.Private
066  public abstract Schema getSchema(T t);
067
068  /**
069   * Create and return Avro DatumWriter for the given class.
070   */
071  @InterfaceAudience.Private
072  public abstract DatumWriter<T> getWriter(Class<T> clazz);
073
074  /**
075   * Create and return Avro DatumReader for the given class.
076   */
077  @InterfaceAudience.Private
078  public abstract DatumReader<T> getReader(Class<T> clazz);
079
080  class AvroSerializer implements Serializer<T> {
081
082    private DatumWriter<T> writer;
083    private BinaryEncoder encoder;
084    private OutputStream outStream;
085
086    AvroSerializer(Class<T> clazz) {
087      this.writer = getWriter(clazz);
088    }
089
090    @Override
091    public void close() throws IOException {
092      encoder.flush();
093      outStream.close();
094    }
095
096    @Override
097    public void open(OutputStream out) throws IOException {
098      outStream = out;
099      encoder = EncoderFactory.get().binaryEncoder(out, encoder);
100    }
101
102    @Override
103    public void serialize(T t) throws IOException {
104      writer.setSchema(getSchema(t));
105      writer.write(t, encoder);
106    }
107
108  }
109
110  class AvroDeserializer implements Deserializer<T> {
111
112    private DatumReader<T> reader;
113    private BinaryDecoder decoder;
114    private InputStream inStream;
115
116    AvroDeserializer(Class<T> clazz) {
117      this.reader = getReader(clazz);
118    }
119
120    @Override
121    public void close() throws IOException {
122      inStream.close();
123    }
124
125    @Override
126    public T deserialize(T t) throws IOException {
127      return reader.read(t, decoder);
128    }
129
130    @Override
131    public void open(InputStream in) throws IOException {
132      inStream = in;
133      decoder = DecoderFactory.get().binaryDecoder(in, decoder);
134    }
135
136  }
137
138}