001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io.compress;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.util.ReflectionUtils;
031
032/**
033 * A global compressor/decompressor pool used to save and reuse 
034 * (possibly native) compression/decompression codecs.
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Evolving
038public class CodecPool {
039  private static final Log LOG = LogFactory.getLog(CodecPool.class);
040  
041  /**
042   * A global compressor pool used to save the expensive 
043   * construction/destruction of (possibly native) decompression codecs.
044   */
045  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
046    new HashMap<Class<Compressor>, List<Compressor>>();
047  
048  /**
049   * A global decompressor pool used to save the expensive 
050   * construction/destruction of (possibly native) decompression codecs.
051   */
052  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
053    new HashMap<Class<Decompressor>, List<Decompressor>>();
054
055  private static <T> T borrow(Map<Class<T>, List<T>> pool,
056                             Class<? extends T> codecClass) {
057    T codec = null;
058    
059    // Check if an appropriate codec is available
060    synchronized (pool) {
061      if (pool.containsKey(codecClass)) {
062        List<T> codecList = pool.get(codecClass);
063        
064        if (codecList != null) {
065          synchronized (codecList) {
066            if (!codecList.isEmpty()) {
067              codec = codecList.remove(codecList.size()-1);
068            }
069          }
070        }
071      }
072    }
073    
074    return codec;
075  }
076
077  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
078    if (codec != null) {
079      Class<T> codecClass = ReflectionUtils.getClass(codec);
080      synchronized (pool) {
081        if (!pool.containsKey(codecClass)) {
082          pool.put(codecClass, new ArrayList<T>());
083        }
084
085        List<T> codecList = pool.get(codecClass);
086        synchronized (codecList) {
087          codecList.add(codec);
088        }
089      }
090    }
091  }
092  
093  /**
094   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
095   * pool or a new one.
096   *
097   * @param codec the <code>CompressionCodec</code> for which to get the 
098   *              <code>Compressor</code>
099   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
100   * @return <code>Compressor</code> for the given 
101   *         <code>CompressionCodec</code> from the pool or a new one
102   */
103  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
104    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
105    if (compressor == null) {
106      compressor = codec.createCompressor();
107      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
108    } else {
109      compressor.reinit(conf);
110      if(LOG.isDebugEnabled()) {
111        LOG.debug("Got recycled compressor");
112      }
113    }
114    return compressor;
115  }
116  
117  public static Compressor getCompressor(CompressionCodec codec) {
118    return getCompressor(codec, null);
119  }
120  
121  /**
122   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
123   * pool or a new one.
124   *  
125   * @param codec the <code>CompressionCodec</code> for which to get the 
126   *              <code>Decompressor</code>
127   * @return <code>Decompressor</code> for the given 
128   *         <code>CompressionCodec</code> the pool or a new one
129   */
130  public static Decompressor getDecompressor(CompressionCodec codec) {
131    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
132    if (decompressor == null) {
133      decompressor = codec.createDecompressor();
134      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
135    } else {
136      if(LOG.isDebugEnabled()) {
137        LOG.debug("Got recycled decompressor");
138      }
139    }
140    return decompressor;
141  }
142  
143  /**
144   * Return the {@link Compressor} to the pool.
145   * 
146   * @param compressor the <code>Compressor</code> to be returned to the pool
147   */
148  public static void returnCompressor(Compressor compressor) {
149    if (compressor == null) {
150      return;
151    }
152    // if the compressor can't be reused, don't pool it.
153    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
154      return;
155    }
156    compressor.reset();
157    payback(compressorPool, compressor);
158  }
159  
160  /**
161   * Return the {@link Decompressor} to the pool.
162   * 
163   * @param decompressor the <code>Decompressor</code> to be returned to the 
164   *                     pool
165   */
166  public static void returnDecompressor(Decompressor decompressor) {
167    if (decompressor == null) {
168      return;
169    }
170    // if the decompressor can't be reused, don't pool it.
171    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
172      return;
173    }
174    decompressor.reset();
175    payback(decompressorPool, decompressor);
176  }
177}