001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.serializer.avro; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.avro.Schema; 026import org.apache.avro.io.BinaryDecoder; 027import org.apache.avro.io.BinaryEncoder; 028import org.apache.avro.io.DatumReader; 029import org.apache.avro.io.DatumWriter; 030import org.apache.avro.io.DecoderFactory; 031import org.apache.avro.io.EncoderFactory; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.io.serializer.Deserializer; 036import org.apache.hadoop.io.serializer.Serialization; 037import org.apache.hadoop.io.serializer.Serializer; 038 039/** 040 * Base class for providing serialization to Avro types. 041 */ 042@InterfaceAudience.Public 043@InterfaceStability.Evolving 044public abstract class AvroSerialization<T> extends Configured 045 implements Serialization<T>{ 046 047 @InterfaceAudience.Private 048 public static final String AVRO_SCHEMA_KEY = "Avro-Schema"; 049 050 @InterfaceAudience.Private 051 public Deserializer<T> getDeserializer(Class<T> c) { 052 return new AvroDeserializer(c); 053 } 054 055 @InterfaceAudience.Private 056 public Serializer<T> getSerializer(Class<T> c) { 057 return new AvroSerializer(c); 058 } 059 060 /** 061 * Return an Avro Schema instance for the given class. 062 */ 063 @InterfaceAudience.Private 064 public abstract Schema getSchema(T t); 065 066 /** 067 * Create and return Avro DatumWriter for the given class. 068 */ 069 @InterfaceAudience.Private 070 public abstract DatumWriter<T> getWriter(Class<T> clazz); 071 072 /** 073 * Create and return Avro DatumReader for the given class. 074 */ 075 @InterfaceAudience.Private 076 public abstract DatumReader<T> getReader(Class<T> clazz); 077 078 class AvroSerializer implements Serializer<T> { 079 080 private DatumWriter<T> writer; 081 private BinaryEncoder encoder; 082 private OutputStream outStream; 083 084 AvroSerializer(Class<T> clazz) { 085 this.writer = getWriter(clazz); 086 } 087 088 @Override 089 public void close() throws IOException { 090 encoder.flush(); 091 outStream.close(); 092 } 093 094 @Override 095 public void open(OutputStream out) throws IOException { 096 outStream = out; 097 encoder = EncoderFactory.get().binaryEncoder(out, encoder); 098 } 099 100 @Override 101 public void serialize(T t) throws IOException { 102 writer.setSchema(getSchema(t)); 103 writer.write(t, encoder); 104 } 105 106 } 107 108 class AvroDeserializer implements Deserializer<T> { 109 110 private DatumReader<T> reader; 111 private BinaryDecoder decoder; 112 private InputStream inStream; 113 114 AvroDeserializer(Class<T> clazz) { 115 this.reader = getReader(clazz); 116 } 117 118 @Override 119 public void close() throws IOException { 120 inStream.close(); 121 } 122 123 @Override 124 public T deserialize(T t) throws IOException { 125 return reader.read(t, decoder); 126 } 127 128 @Override 129 public void open(InputStream in) throws IOException { 130 inStream = in; 131 decoder = DecoderFactory.get().binaryDecoder(in, decoder); 132 } 133 134 } 135 136}