001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.serializer.avro;
020
021import java.util.HashSet;
022import java.util.Set;
023
024import org.apache.avro.Schema;
025import org.apache.avro.io.DatumReader;
026import org.apache.avro.io.DatumWriter;
027import org.apache.avro.reflect.ReflectData;
028import org.apache.avro.reflect.ReflectDatumReader;
029import org.apache.avro.reflect.ReflectDatumWriter;
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032
033/**
034 * Serialization for Avro Reflect classes. For a class to be accepted by this 
035 * serialization, it must either be in the package list configured via 
036 * <code>avro.reflect.pkgs</code> or implement 
037 * {@link AvroReflectSerializable} interface.
038 *
039 */
040@SuppressWarnings("unchecked")
041@InterfaceAudience.Public
042@InterfaceStability.Evolving
043public class AvroReflectSerialization extends AvroSerialization<Object>{
044
045  /**
046   * Key to configure packages that contain classes to be serialized and 
047   * deserialized using this class. Multiple packages can be specified using 
048   * comma-separated list.
049   */
050  @InterfaceAudience.Private
051  public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs";
052
053  private Set<String> packages; 
054
055  @InterfaceAudience.Private
056  @Override
057  public synchronized boolean accept(Class<?> c) {
058    if (packages == null) {
059      getPackages();
060    }
061    return AvroReflectSerializable.class.isAssignableFrom(c) ||
062      (c.getPackage() != null && packages.contains(c.getPackage().getName()));
063  }
064
065  private void getPackages() {
066    String[] pkgList  = getConf().getStrings(AVRO_REFLECT_PACKAGES);
067    packages = new HashSet<String>();
068    if (pkgList != null) {
069      for (String pkg : pkgList) {
070        packages.add(pkg.trim());
071      }
072    }
073  }
074
075  @InterfaceAudience.Private
076  @Override
077  public DatumReader getReader(Class<Object> clazz) {
078    try {
079      return new ReflectDatumReader(clazz);
080    } catch (Exception e) {
081      throw new RuntimeException(e);
082    }
083  }
084
085  @InterfaceAudience.Private
086  @Override
087  public Schema getSchema(Object t) {
088    return ReflectData.get().getSchema(t.getClass());
089  }
090
091  @InterfaceAudience.Private
092  @Override
093  public DatumWriter getWriter(Class<Object> clazz) {
094    return new ReflectDatumWriter();
095  }
096
097}