001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.hadoop.io.compress;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    import org.apache.hadoop.classification.InterfaceAudience;
029    import org.apache.hadoop.classification.InterfaceStability;
030    import org.apache.hadoop.conf.Configuration;
031    import org.apache.hadoop.util.ReflectionUtils;
032    
033    import com.google.common.cache.CacheBuilder;
034    import com.google.common.cache.CacheLoader;
035    import com.google.common.cache.LoadingCache;
036    
037    /**
038     * A global compressor/decompressor pool used to save and reuse 
039     * (possibly native) compression/decompression codecs.
040     */
041    @InterfaceAudience.Public
042    @InterfaceStability.Evolving
043    public class CodecPool {
044      private static final Log LOG = LogFactory.getLog(CodecPool.class);
045      
046      /**
047       * A global compressor pool used to save the expensive 
048       * construction/destruction of (possibly native) decompression codecs.
049       */
050      private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 
051        new HashMap<Class<Compressor>, List<Compressor>>();
052      
053      /**
054       * A global decompressor pool used to save the expensive 
055       * construction/destruction of (possibly native) decompression codecs.
056       */
057      private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 
058        new HashMap<Class<Decompressor>, List<Decompressor>>();
059    
060      private static <T> LoadingCache<Class<T>, AtomicInteger> createCache(
061          Class<T> klass) {
062        return CacheBuilder.newBuilder().build(
063            new CacheLoader<Class<T>, AtomicInteger>() {
064              @Override
065              public AtomicInteger load(Class<T> key) throws Exception {
066                return new AtomicInteger();
067              }
068            });
069      }
070    
071      /**
072       * Map to track the number of leased compressors
073       */
074      private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts =
075          createCache(Compressor.class);
076    
077       /**
078       * Map to tracks the number of leased decompressors
079       */
080      private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts =
081          createCache(Decompressor.class);
082    
083      private static <T> T borrow(Map<Class<T>, List<T>> pool,
084                                 Class<? extends T> codecClass) {
085        T codec = null;
086        
087        // Check if an appropriate codec is available
088        List<T> codecList;
089        synchronized (pool) {
090          codecList = pool.get(codecClass);
091        }
092    
093        if (codecList != null) {
094          synchronized (codecList) {
095            if (!codecList.isEmpty()) {
096              codec = codecList.remove(codecList.size() - 1);
097            }
098          }
099        }
100        
101        return codec;
102      }
103    
104      private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
105        if (codec != null) {
106          Class<T> codecClass = ReflectionUtils.getClass(codec);
107          List<T> codecList;
108          synchronized (pool) {
109            codecList = pool.get(codecClass);
110            if (codecList == null) {
111              codecList = new ArrayList<T>();
112              pool.put(codecClass, codecList);
113            }
114          }
115    
116          synchronized (codecList) {
117            codecList.add(codec);
118          }
119        }
120      }
121      
122      @SuppressWarnings("unchecked")
123      private static <T> int getLeaseCount(
124          LoadingCache<Class<T>, AtomicInteger> usageCounts,
125          Class<? extends T> codecClass) {
126        return usageCounts.getUnchecked((Class<T>) codecClass).get();
127      }
128    
129      private static <T> void updateLeaseCount(
130          LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) {
131        if (codec != null) {
132          Class<T> codecClass = ReflectionUtils.getClass(codec);
133          usageCounts.getUnchecked(codecClass).addAndGet(delta);
134        }
135      }
136    
137      /**
138       * Get a {@link Compressor} for the given {@link CompressionCodec} from the 
139       * pool or a new one.
140       *
141       * @param codec the <code>CompressionCodec</code> for which to get the 
142       *              <code>Compressor</code>
143       * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
144       * @return <code>Compressor</code> for the given 
145       *         <code>CompressionCodec</code> from the pool or a new one
146       */
147      public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
148        Compressor compressor = borrow(compressorPool, codec.getCompressorType());
149        if (compressor == null) {
150          compressor = codec.createCompressor();
151          LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
152        } else {
153          compressor.reinit(conf);
154          if(LOG.isDebugEnabled()) {
155            LOG.debug("Got recycled compressor");
156          }
157        }
158        updateLeaseCount(compressorCounts, compressor, 1);
159        return compressor;
160      }
161      
162      public static Compressor getCompressor(CompressionCodec codec) {
163        return getCompressor(codec, null);
164      }
165      
166      /**
167       * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
168       * pool or a new one.
169       *  
170       * @param codec the <code>CompressionCodec</code> for which to get the 
171       *              <code>Decompressor</code>
172       * @return <code>Decompressor</code> for the given 
173       *         <code>CompressionCodec</code> the pool or a new one
174       */
175      public static Decompressor getDecompressor(CompressionCodec codec) {
176        Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
177        if (decompressor == null) {
178          decompressor = codec.createDecompressor();
179          LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
180        } else {
181          if(LOG.isDebugEnabled()) {
182            LOG.debug("Got recycled decompressor");
183          }
184        }
185        updateLeaseCount(decompressorCounts, decompressor, 1);
186        return decompressor;
187      }
188      
189      /**
190       * Return the {@link Compressor} to the pool.
191       * 
192       * @param compressor the <code>Compressor</code> to be returned to the pool
193       */
194      public static void returnCompressor(Compressor compressor) {
195        if (compressor == null) {
196          return;
197        }
198        // if the compressor can't be reused, don't pool it.
199        if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
200          return;
201        }
202        compressor.reset();
203        payback(compressorPool, compressor);
204        updateLeaseCount(compressorCounts, compressor, -1);
205      }
206      
207      /**
208       * Return the {@link Decompressor} to the pool.
209       * 
210       * @param decompressor the <code>Decompressor</code> to be returned to the 
211       *                     pool
212       */
213      public static void returnDecompressor(Decompressor decompressor) {
214        if (decompressor == null) {
215          return;
216        }
217        // if the decompressor can't be reused, don't pool it.
218        if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
219          return;
220        }
221        decompressor.reset();
222        payback(decompressorPool, decompressor);
223        updateLeaseCount(decompressorCounts, decompressor, -1);
224      }
225    
226      /**
227       * Return the number of leased {@link Compressor}s for this
228       * {@link CompressionCodec}
229       */
230      public static int getLeasedCompressorsCount(CompressionCodec codec) {
231        return (codec == null) ? 0 : getLeaseCount(compressorCounts,
232            codec.getCompressorType());
233      }
234    
235      /**
236       * Return the number of leased {@link Decompressor}s for this
237       * {@link CompressionCodec}
238       */
239      public static int getLeasedDecompressorsCount(CompressionCodec codec) {
240        return (codec == null) ? 0 : getLeaseCount(decompressorCounts,
241            codec.getDecompressorType());
242      }
243    }