001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.serializer.avro;
020    
021    import java.util.HashSet;
022    import java.util.Set;
023    
024    import org.apache.avro.Schema;
025    import org.apache.avro.io.DatumReader;
026    import org.apache.avro.io.DatumWriter;
027    import org.apache.avro.reflect.ReflectData;
028    import org.apache.avro.reflect.ReflectDatumReader;
029    import org.apache.avro.reflect.ReflectDatumWriter;
030    import org.apache.hadoop.classification.InterfaceAudience;
031    import org.apache.hadoop.classification.InterfaceStability;
032    
033    /**
034     * Serialization for Avro Reflect classes. For a class to be accepted by this 
035     * serialization, it must either be in the package list configured via 
036     * <code>avro.reflect.pkgs</code> or implement 
037     * {@link AvroReflectSerializable} interface.
038     *
039     */
040    @SuppressWarnings("unchecked")
041    @InterfaceAudience.Public
042    @InterfaceStability.Evolving
043    public class AvroReflectSerialization extends AvroSerialization<Object>{
044    
045      /**
046       * Key to configure packages that contain classes to be serialized and 
047       * deserialized using this class. Multiple packages can be specified using 
048       * comma-separated list.
049       */
050      @InterfaceAudience.Private
051      public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs";
052    
053      private Set<String> packages; 
054    
055      @InterfaceAudience.Private
056      @Override
057      public synchronized boolean accept(Class<?> c) {
058        if (packages == null) {
059          getPackages();
060        }
061        return AvroReflectSerializable.class.isAssignableFrom(c) ||
062          (c.getPackage() != null && packages.contains(c.getPackage().getName()));
063      }
064    
065      private void getPackages() {
066        String[] pkgList  = getConf().getStrings(AVRO_REFLECT_PACKAGES);
067        packages = new HashSet<String>();
068        if (pkgList != null) {
069          for (String pkg : pkgList) {
070            packages.add(pkg.trim());
071          }
072        }
073      }
074    
075      @InterfaceAudience.Private
076      @Override
077      public DatumReader getReader(Class<Object> clazz) {
078        try {
079          return new ReflectDatumReader(clazz);
080        } catch (Exception e) {
081          throw new RuntimeException(e);
082        }
083      }
084    
085      @InterfaceAudience.Private
086      @Override
087      public Schema getSchema(Object t) {
088        return ReflectData.get().getSchema(t.getClass());
089      }
090    
091      @InterfaceAudience.Private
092      @Override
093      public DatumWriter getWriter(Class<Object> clazz) {
094        return new ReflectDatumWriter();
095      }
096    
097    }