001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.sharedcache;
020
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023
024import org.apache.hadoop.classification.InterfaceAudience.Public;
025import org.apache.hadoop.classification.InterfaceStability.Evolving;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.util.ReflectionUtils;
028import org.apache.hadoop.yarn.conf.YarnConfiguration;
029import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
030
031@SuppressWarnings("unchecked")
032@Public
033@Evolving
034/**
035 * A factory class for creating checksum objects based on a configurable
036 * algorithm implementation
037 */
038public class SharedCacheChecksumFactory {
039  private static final
040      ConcurrentMap<Class<? extends SharedCacheChecksum>,SharedCacheChecksum>
041      instances =
042          new ConcurrentHashMap<Class<? extends SharedCacheChecksum>,
043          SharedCacheChecksum>();
044
045  private static final Class<? extends SharedCacheChecksum> defaultAlgorithm;
046
047  static {
048    try {
049      defaultAlgorithm = (Class<? extends SharedCacheChecksum>)
050          Class.forName(
051              YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
052    } catch (Exception e) {
053      // cannot happen
054      throw new ExceptionInInitializerError(e);
055    }
056  }
057
058  /**
059   * Get a new <code>SharedCacheChecksum</code> object based on the configurable
060   * algorithm implementation
061   * (see <code>yarn.sharedcache.checksum.algo.impl</code>)
062   *
063   * @return <code>SharedCacheChecksum</code> object
064   */
065  public static SharedCacheChecksum getChecksum(Configuration conf) {
066    Class<? extends SharedCacheChecksum> clazz =
067        conf.getClass(YarnConfiguration.SHARED_CACHE_CHECKSUM_ALGO_IMPL,
068        defaultAlgorithm, SharedCacheChecksum.class);
069    SharedCacheChecksum checksum = instances.get(clazz);
070    if (checksum == null) {
071      try {
072        checksum = ReflectionUtils.newInstance(clazz, conf);
073        SharedCacheChecksum old = instances.putIfAbsent(clazz, checksum);
074        if (old != null) {
075          checksum = old;
076        }
077      } catch (Exception e) {
078        throw new YarnRuntimeException(e);
079      }
080    }
081
082    return checksum;
083  }
084}