001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.serializer.avro; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.avro.Schema; 026import org.apache.avro.io.BinaryDecoder; 027import org.apache.avro.io.BinaryEncoder; 028import org.apache.avro.io.DatumReader; 029import org.apache.avro.io.DatumWriter; 030import org.apache.avro.io.DecoderFactory; 031import org.apache.avro.io.EncoderFactory; 032import org.apache.hadoop.classification.InterfaceAudience; 033import org.apache.hadoop.classification.InterfaceStability; 034import org.apache.hadoop.conf.Configured; 035import org.apache.hadoop.io.serializer.Deserializer; 036import org.apache.hadoop.io.serializer.Serialization; 037import org.apache.hadoop.io.serializer.Serializer; 038 039/** 040 * Base class for providing serialization to Avro types. 041 */ 042@InterfaceAudience.Public 043@InterfaceStability.Evolving 044public abstract class AvroSerialization<T> extends Configured 045 implements Serialization<T>{ 046 047 @InterfaceAudience.Private 048 public static final String AVRO_SCHEMA_KEY = "Avro-Schema"; 049 050 @Override 051 @InterfaceAudience.Private 052 public Deserializer<T> getDeserializer(Class<T> c) { 053 return new AvroDeserializer(c); 054 } 055 056 @Override 057 @InterfaceAudience.Private 058 public Serializer<T> getSerializer(Class<T> c) { 059 return new AvroSerializer(c); 060 } 061 062 /** 063 * Return an Avro Schema instance for the given class. 064 */ 065 @InterfaceAudience.Private 066 public abstract Schema getSchema(T t); 067 068 /** 069 * Create and return Avro DatumWriter for the given class. 070 */ 071 @InterfaceAudience.Private 072 public abstract DatumWriter<T> getWriter(Class<T> clazz); 073 074 /** 075 * Create and return Avro DatumReader for the given class. 076 */ 077 @InterfaceAudience.Private 078 public abstract DatumReader<T> getReader(Class<T> clazz); 079 080 class AvroSerializer implements Serializer<T> { 081 082 private DatumWriter<T> writer; 083 private BinaryEncoder encoder; 084 private OutputStream outStream; 085 086 AvroSerializer(Class<T> clazz) { 087 this.writer = getWriter(clazz); 088 } 089 090 @Override 091 public void close() throws IOException { 092 encoder.flush(); 093 outStream.close(); 094 } 095 096 @Override 097 public void open(OutputStream out) throws IOException { 098 outStream = out; 099 encoder = EncoderFactory.get().binaryEncoder(out, encoder); 100 } 101 102 @Override 103 public void serialize(T t) throws IOException { 104 writer.setSchema(getSchema(t)); 105 writer.write(t, encoder); 106 } 107 108 } 109 110 class AvroDeserializer implements Deserializer<T> { 111 112 private DatumReader<T> reader; 113 private BinaryDecoder decoder; 114 private InputStream inStream; 115 116 AvroDeserializer(Class<T> clazz) { 117 this.reader = getReader(clazz); 118 } 119 120 @Override 121 public void close() throws IOException { 122 inStream.close(); 123 } 124 125 @Override 126 public T deserialize(T t) throws IOException { 127 return reader.read(t, decoder); 128 } 129 130 @Override 131 public void open(InputStream in) throws IOException { 132 inStream = in; 133 decoder = DecoderFactory.get().binaryDecoder(in, decoder); 134 } 135 136 } 137 138}