001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io.compress;
019
020import java.util.HashSet;
021import java.util.HashMap;
022import java.util.Set;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.util.ReflectionUtils;
032
033import com.google.common.cache.CacheBuilder;
034import com.google.common.cache.CacheLoader;
035import com.google.common.cache.LoadingCache;
036
037/**
038 * A global compressor/decompressor pool used to save and reuse 
039 * (possibly native) compression/decompression codecs.
040 */
041@InterfaceAudience.Public
042@InterfaceStability.Evolving
043public class CodecPool {
044  private static final Log LOG = LogFactory.getLog(CodecPool.class);
045  
046  /**
047   * A global compressor pool used to save the expensive 
048   * construction/destruction of (possibly native) decompression codecs.
049   */
050  private static final Map<Class<Compressor>, Set<Compressor>> compressorPool =
051    new HashMap<Class<Compressor>, Set<Compressor>>();
052  
053  /**
054   * A global decompressor pool used to save the expensive 
055   * construction/destruction of (possibly native) decompression codecs.
056   */
057  private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool =
058    new HashMap<Class<Decompressor>, Set<Decompressor>>();
059
060  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
061      Class<T> klass) {
062    return CacheBuilder.newBuilder().build(
063        new CacheLoader<Class<T>, AtomicInteger>() {
064          @Override
065          public AtomicInteger load(Class<T> key) throws Exception {
066            return new AtomicInteger();
067          }
068        });
069  }
070
071  /**
072   * Map to track the number of leased compressors
073   */
074  private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
075      createCache(Compressor.class);
076
077   /**
078   * Map to tracks the number of leased decompressors
079   */
080  private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
081      createCache(Decompressor.class);
082
083  private static <T> T borrow(Map<Class<T>, Set<T>> pool,
084                             Class<? extends T> codecClass) {
085    T codec = null;
086    
087    // Check if an appropriate codec is available
088    Set<T> codecSet;
089    synchronized (pool) {
090      codecSet = pool.get(codecClass);
091    }
092
093    if (codecSet != null) {
094      synchronized (codecSet) {
095        if (!codecSet.isEmpty()) {
096          codec = codecSet.iterator().next();
097          codecSet.remove(codec);
098        }
099      }
100    }
101    
102    return codec;
103  }
104
105  private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) {
106    if (codec != null) {
107      Class<T> codecClass = ReflectionUtils.getClass(codec);
108      Set<T> codecSet;
109      synchronized (pool) {
110        codecSet = pool.get(codecClass);
111        if (codecSet == null) {
112          codecSet = new HashSet<T>();
113          pool.put(codecClass, codecSet);
114        }
115      }
116
117      synchronized (codecSet) {
118        return codecSet.add(codec);
119      }
120    }
121    return false;
122  }
123  
124  @SuppressWarnings("unchecked")
125  private static <T> int getLeaseCount(
126      LoadingCache<Class<T>, AtomicInteger> usageCounts,
127      Class<? extends T> codecClass) {
128    return usageCounts.getUnchecked((Class<T>) codecClass).get();
129  }
130
131  private static <T> void updateLeaseCount(
132      LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
133    if (codec != null) {
134      Class<T> codecClass = ReflectionUtils.getClass(codec);
135      usageCounts.getUnchecked(codecClass).addAndGet(delta);
136    }
137  }
138
139  /**
140   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
141   * pool or a new one.
142   *
143   * @param codec the <code>CompressionCodec</code> for which to get the 
144   *              <code>Compressor</code>
145   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
146   * @return <code>Compressor</code> for the given 
147   *         <code>CompressionCodec</code> from the pool or a new one
148   */
149  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
150    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
151    if (compressor == null) {
152      compressor = codec.createCompressor();
153      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
154    } else {
155      compressor.reinit(conf);
156      if(LOG.isDebugEnabled()) {
157        LOG.debug("Got recycled compressor");
158      }
159    }
160    updateLeaseCount(compressorCounts, compressor, 1);
161    return compressor;
162  }
163  
164  public static Compressor getCompressor(CompressionCodec codec) {
165    return getCompressor(codec, null);
166  }
167  
168  /**
169   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
170   * pool or a new one.
171   *  
172   * @param codec the <code>CompressionCodec</code> for which to get the 
173   *              <code>Decompressor</code>
174   * @return <code>Decompressor</code> for the given 
175   *         <code>CompressionCodec</code> the pool or a new one
176   */
177  public static Decompressor getDecompressor(CompressionCodec codec) {
178    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
179    if (decompressor == null) {
180      decompressor = codec.createDecompressor();
181      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
182    } else {
183      if(LOG.isDebugEnabled()) {
184        LOG.debug("Got recycled decompressor");
185      }
186    }
187    updateLeaseCount(decompressorCounts, decompressor, 1);
188    return decompressor;
189  }
190  
191  /**
192   * Return the {@link Compressor} to the pool.
193   * 
194   * @param compressor the <code>Compressor</code> to be returned to the pool
195   */
196  public static void returnCompressor(Compressor compressor) {
197    if (compressor == null) {
198      return;
199    }
200    // if the compressor can't be reused, don't pool it.
201    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
202      return;
203    }
204    compressor.reset();
205    if (payback(compressorPool, compressor)) {
206      updateLeaseCount(compressorCounts, compressor, -1);
207    }
208  }
209  
210  /**
211   * Return the {@link Decompressor} to the pool.
212   * 
213   * @param decompressor the <code>Decompressor</code> to be returned to the 
214   *                     pool
215   */
216  public static void returnDecompressor(Decompressor decompressor) {
217    if (decompressor == null) {
218      return;
219    }
220    // if the decompressor can't be reused, don't pool it.
221    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
222      return;
223    }
224    decompressor.reset();
225    if (payback(decompressorPool, decompressor)) {
226      updateLeaseCount(decompressorCounts, decompressor, -1);
227    }
228  }
229
230  /**
231   * Return the number of leased {@link Compressor}s for this
232   * {@link CompressionCodec}
233   */
234  public static int getLeasedCompressorsCount(CompressionCodec codec) {
235    return (codec == null) ? 0 : getLeaseCount(compressorCounts,
236        codec.getCompressorType());
237  }
238
239  /**
240   * Return the number of leased {@link Decompressor}s for this
241   * {@link CompressionCodec}
242   */
243  public static int getLeasedDecompressorsCount(CompressionCodec codec) {
244    return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
245        codec.getDecompressorType());
246  }
247}