001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io.compress;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.util.ReflectionUtils;
032
033import com.google.common.cache.CacheBuilder;
034import com.google.common.cache.CacheLoader;
035import com.google.common.cache.LoadingCache;
036
037/**
038 * A global compressor/decompressor pool used to save and reuse 
039 * (possibly native) compression/decompression codecs.
040 */
041@InterfaceAudience.Public
042@InterfaceStability.Evolving
043public class CodecPool {
044  private static final Log LOG = LogFactory.getLog(CodecPool.class);
045  
046  /**
047   * A global compressor pool used to save the expensive 
048   * construction/destruction of (possibly native) decompression codecs.
049   */
050  private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
051    new HashMap<Class<Compressor>, List<Compressor>>();
052  
053  /**
054   * A global decompressor pool used to save the expensive 
055   * construction/destruction of (possibly native) decompression codecs.
056   */
057  private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
058    new HashMap<Class<Decompressor>, List<Decompressor>>();
059
060  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
061      Class<T> klass) {
062    return CacheBuilder.newBuilder().build(
063        new CacheLoader<Class<T>, AtomicInteger>() {
064          @Override
065          public AtomicInteger load(Class<T> key) throws Exception {
066            return new AtomicInteger();
067          }
068        });
069  }
070
071  /**
072   * Map to track the number of leased compressors
073   */
074  private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
075      createCache(Compressor.class);
076
077   /**
078   * Map to tracks the number of leased decompressors
079   */
080  private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
081      createCache(Decompressor.class);
082
083  private static <T> T borrow(Map<Class<T>, List<T>> pool,
084                             Class<? extends T> codecClass) {
085    T codec = null;
086    
087    // Check if an appropriate codec is available
088    List<T> codecList;
089    synchronized (pool) {
090      codecList = pool.get(codecClass);
091    }
092
093    if (codecList != null) {
094      synchronized (codecList) {
095        if (!codecList.isEmpty()) {
096          codec = codecList.remove(codecList.size() - 1);
097        }
098      }
099    }
100    
101    return codec;
102  }
103
104  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
105    if (codec != null) {
106      Class<T> codecClass = ReflectionUtils.getClass(codec);
107      List<T> codecList;
108      synchronized (pool) {
109        codecList = pool.get(codecClass);
110        if (codecList == null) {
111          codecList = new ArrayList<T>();
112          pool.put(codecClass, codecList);
113        }
114      }
115
116      synchronized (codecList) {
117        codecList.add(codec);
118      }
119    }
120  }
121  
122  @SuppressWarnings("unchecked")
123  private static <T> int getLeaseCount(
124      LoadingCache<Class<T>, AtomicInteger> usageCounts,
125      Class<? extends T> codecClass) {
126    return usageCounts.getUnchecked((Class<T>) codecClass).get();
127  }
128
129  private static <T> void updateLeaseCount(
130      LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
131    if (codec != null) {
132      Class<T> codecClass = ReflectionUtils.getClass(codec);
133      usageCounts.getUnchecked(codecClass).addAndGet(delta);
134    }
135  }
136
137  /**
138   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
139   * pool or a new one.
140   *
141   * @param codec the <code>CompressionCodec</code> for which to get the 
142   *              <code>Compressor</code>
143   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
144   * @return <code>Compressor</code> for the given 
145   *         <code>CompressionCodec</code> from the pool or a new one
146   */
147  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
148    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
149    if (compressor == null) {
150      compressor = codec.createCompressor();
151      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
152    } else {
153      compressor.reinit(conf);
154      if(LOG.isDebugEnabled()) {
155        LOG.debug("Got recycled compressor");
156      }
157    }
158    updateLeaseCount(compressorCounts, compressor, 1);
159    return compressor;
160  }
161  
162  public static Compressor getCompressor(CompressionCodec codec) {
163    return getCompressor(codec, null);
164  }
165  
166  /**
167   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
168   * pool or a new one.
169   *  
170   * @param codec the <code>CompressionCodec</code> for which to get the 
171   *              <code>Decompressor</code>
172   * @return <code>Decompressor</code> for the given 
173   *         <code>CompressionCodec</code> the pool or a new one
174   */
175  public static Decompressor getDecompressor(CompressionCodec codec) {
176    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
177    if (decompressor == null) {
178      decompressor = codec.createDecompressor();
179      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
180    } else {
181      if(LOG.isDebugEnabled()) {
182        LOG.debug("Got recycled decompressor");
183      }
184    }
185    updateLeaseCount(decompressorCounts, decompressor, 1);
186    return decompressor;
187  }
188  
189  /**
190   * Return the {@link Compressor} to the pool.
191   * 
192   * @param compressor the <code>Compressor</code> to be returned to the pool
193   */
194  public static void returnCompressor(Compressor compressor) {
195    if (compressor == null) {
196      return;
197    }
198    // if the compressor can't be reused, don't pool it.
199    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
200      return;
201    }
202    compressor.reset();
203    payback(compressorPool, compressor);
204    updateLeaseCount(compressorCounts, compressor, -1);
205  }
206  
207  /**
208   * Return the {@link Decompressor} to the pool.
209   * 
210   * @param decompressor the <code>Decompressor</code> to be returned to the 
211   *                     pool
212   */
213  public static void returnDecompressor(Decompressor decompressor) {
214    if (decompressor == null) {
215      return;
216    }
217    // if the decompressor can't be reused, don't pool it.
218    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
219      return;
220    }
221    decompressor.reset();
222    payback(decompressorPool, decompressor);
223    updateLeaseCount(decompressorCounts, decompressor, -1);
224  }
225
226  /**
227   * Return the number of leased {@link Compressor}s for this
228   * {@link CompressionCodec}
229   */
230  public static int getLeasedCompressorsCount(CompressionCodec codec) {
231    return (codec == null) ? 0 : getLeaseCount(compressorCounts,
232        codec.getCompressorType());
233  }
234
235  /**
236   * Return the number of leased {@link Decompressor}s for this
237   * {@link CompressionCodec}
238   */
239  public static int getLeasedDecompressorsCount(CompressionCodec codec) {
240    return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
241        codec.getDecompressorType());
242  }
243}