001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.io.compress;
019
020import java.util.HashSet;
021import java.util.HashMap;
022import java.util.Set;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.apache.hadoop.classification.InterfaceAudience;
029import org.apache.hadoop.classification.InterfaceStability;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.util.ReflectionUtils;
032
033import com.google.common.cache.CacheBuilder;
034import com.google.common.cache.CacheLoader;
035import com.google.common.cache.LoadingCache;
036
037/**
038 * A global compressor/decompressor pool used to save and reuse 
039 * (possibly native) compression/decompression codecs.
040 */
041@InterfaceAudience.Public
042@InterfaceStability.Evolving
043public class CodecPool {
044  private static final Log LOG = LogFactory.getLog(CodecPool.class);
045  
046  /**
047   * A global compressor pool used to save the expensive 
048   * construction/destruction of (possibly native) decompression codecs.
049   */
050  private static final Map<Class<Compressor>, Set<Compressor>> compressorPool =
051    new HashMap<Class<Compressor>, Set<Compressor>>();
052  
053  /**
054   * A global decompressor pool used to save the expensive 
055   * construction/destruction of (possibly native) decompression codecs.
056   */
057  private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool =
058    new HashMap<Class<Decompressor>, Set<Decompressor>>();
059
060  private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
061      Class<T> klass) {
062    return CacheBuilder.newBuilder().build(
063        new CacheLoader<Class<T>, AtomicInteger>() {
064          @Override
065          public AtomicInteger load(Class<T> key) throws Exception {
066            return new AtomicInteger();
067          }
068        });
069  }
070
071  /**
072   * Map to track the number of leased compressors
073   */
074  private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
075      createCache(Compressor.class);
076
077   /**
078   * Map to tracks the number of leased decompressors
079   */
080  private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
081      createCache(Decompressor.class);
082
083  private static <T> T borrow(Map<Class<T>, Set<T>> pool,
084                             Class<? extends T> codecClass) {
085    T codec = null;
086    
087    // Check if an appropriate codec is available
088    Set<T> codecSet;
089    synchronized (pool) {
090      codecSet = pool.get(codecClass);
091    }
092
093    if (codecSet != null) {
094      synchronized (codecSet) {
095        if (!codecSet.isEmpty()) {
096          codec = codecSet.iterator().next();
097          codecSet.remove(codec);
098        }
099      }
100    }
101    
102    return codec;
103  }
104
105  private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) {
106    if (codec != null) {
107      Class<T> codecClass = ReflectionUtils.getClass(codec);
108      Set<T> codecSet;
109      synchronized (pool) {
110        codecSet = pool.get(codecClass);
111        if (codecSet == null) {
112          codecSet = new HashSet<T>();
113          pool.put(codecClass, codecSet);
114        }
115      }
116
117      synchronized (codecSet) {
118        return codecSet.add(codec);
119      }
120    }
121    return false;
122  }
123  
124  @SuppressWarnings("unchecked")
125  private static <T> int getLeaseCount(
126      LoadingCache<Class<T>, AtomicInteger> usageCounts,
127      Class<? extends T> codecClass) {
128    return usageCounts.getUnchecked((Class<T>) codecClass).get();
129  }
130
131  private static <T> void updateLeaseCount(
132      LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
133    if (codec != null) {
134      Class<T> codecClass = ReflectionUtils.getClass(codec);
135      usageCounts.getUnchecked(codecClass).addAndGet(delta);
136    }
137  }
138
139  /**
140   * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
141   * pool or a new one.
142   *
143   * @param codec the <code>CompressionCodec</code> for which to get the 
144   *              <code>Compressor</code>
145   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
146   * @return <code>Compressor</code> for the given 
147   *         <code>CompressionCodec</code> from the pool or a new one
148   */
149  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
150    Compressor compressor = borrow(compressorPool, codec.getCompressorType());
151    if (compressor == null) {
152      compressor = codec.createCompressor();
153      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
154    } else {
155      compressor.reinit(conf);
156      if(LOG.isDebugEnabled()) {
157        LOG.debug("Got recycled compressor");
158      }
159    }
160    if (compressor != null &&
161        !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
162      updateLeaseCount(compressorCounts, compressor, 1);
163    }
164    return compressor;
165  }
166  
167  public static Compressor getCompressor(CompressionCodec codec) {
168    return getCompressor(codec, null);
169  }
170  
171  /**
172   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
173   * pool or a new one.
174   *  
175   * @param codec the <code>CompressionCodec</code> for which to get the 
176   *              <code>Decompressor</code>
177   * @return <code>Decompressor</code> for the given 
178   *         <code>CompressionCodec</code> the pool or a new one
179   */
180  public static Decompressor getDecompressor(CompressionCodec codec) {
181    Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
182    if (decompressor == null) {
183      decompressor = codec.createDecompressor();
184      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
185    } else {
186      if(LOG.isDebugEnabled()) {
187        LOG.debug("Got recycled decompressor");
188      }
189    }
190    if (decompressor != null &&
191        !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
192      updateLeaseCount(decompressorCounts, decompressor, 1);
193    }
194    return decompressor;
195  }
196  
197  /**
198   * Return the {@link Compressor} to the pool.
199   * 
200   * @param compressor the <code>Compressor</code> to be returned to the pool
201   */
202  public static void returnCompressor(Compressor compressor) {
203    if (compressor == null) {
204      return;
205    }
206    // if the compressor can't be reused, don't pool it.
207    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
208      return;
209    }
210    compressor.reset();
211    if (payback(compressorPool, compressor)) {
212      updateLeaseCount(compressorCounts, compressor, -1);
213    }
214  }
215  
216  /**
217   * Return the {@link Decompressor} to the pool.
218   * 
219   * @param decompressor the <code>Decompressor</code> to be returned to the 
220   *                     pool
221   */
222  public static void returnDecompressor(Decompressor decompressor) {
223    if (decompressor == null) {
224      return;
225    }
226    // if the decompressor can't be reused, don't pool it.
227    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
228      return;
229    }
230    decompressor.reset();
231    if (payback(decompressorPool, decompressor)) {
232      updateLeaseCount(decompressorCounts, decompressor, -1);
233    }
234  }
235
236  /**
237   * Return the number of leased {@link Compressor}s for this
238   * {@link CompressionCodec}
239   */
240  public static int getLeasedCompressorsCount(CompressionCodec codec) {
241    return (codec == null) ? 0 : getLeaseCount(compressorCounts,
242        codec.getCompressorType());
243  }
244
245  /**
246   * Return the number of leased {@link Decompressor}s for this
247   * {@link CompressionCodec}
248   */
249  public static int getLeasedDecompressorsCount(CompressionCodec codec) {
250    return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
251        codec.getDecompressorType());
252  }
253}