001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io.compress; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.util.ReflectionUtils; 032 033import com.google.common.cache.CacheBuilder; 034import com.google.common.cache.CacheLoader; 035import com.google.common.cache.LoadingCache; 036 037/** 038 * A global compressor/decompressor pool used to save and reuse 039 * (possibly native) compression/decompression codecs. 040 */ 041@InterfaceAudience.Public 042@InterfaceStability.Evolving 043public class CodecPool { 044 private static final Log LOG = LogFactory.getLog(CodecPool.class); 045 046 /** 047 * A global compressor pool used to save the expensive 048 * construction/destruction of (possibly native) decompression codecs. 049 */ 050 private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 051 new HashMap<Class<Compressor>, List<Compressor>>(); 052 053 /** 054 * A global decompressor pool used to save the expensive 055 * construction/destruction of (possibly native) decompression codecs. 056 */ 057 private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 058 new HashMap<Class<Decompressor>, List<Decompressor>>(); 059 060 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache( 061 Class<T> klass) { 062 return CacheBuilder.newBuilder().build( 063 new CacheLoader<Class<T>, AtomicInteger>() { 064 @Override 065 public AtomicInteger load(Class<T> key) throws Exception { 066 return new AtomicInteger(); 067 } 068 }); 069 } 070 071 /** 072 * Map to track the number of leased compressors 073 */ 074 private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = 075 createCache(Compressor.class); 076 077 /** 078 * Map to tracks the number of leased decompressors 079 */ 080 private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = 081 createCache(Decompressor.class); 082 083 private static <T> T borrow(Map<Class<T>, List<T>> pool, 084 Class<? extends T> codecClass) { 085 T codec = null; 086 087 // Check if an appropriate codec is available 088 List<T> codecList; 089 synchronized (pool) { 090 codecList = pool.get(codecClass); 091 } 092 093 if (codecList != null) { 094 synchronized (codecList) { 095 if (!codecList.isEmpty()) { 096 codec = codecList.remove(codecList.size() - 1); 097 } 098 } 099 } 100 101 return codec; 102 } 103 104 private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { 105 if (codec != null) { 106 Class<T> codecClass = ReflectionUtils.getClass(codec); 107 List<T> codecList; 108 synchronized (pool) { 109 codecList = pool.get(codecClass); 110 if (codecList == null) { 111 codecList = new ArrayList<T>(); 112 pool.put(codecClass, codecList); 113 } 114 } 115 116 synchronized (codecList) { 117 codecList.add(codec); 118 } 119 } 120 } 121 122 @SuppressWarnings("unchecked") 123 private static <T> int getLeaseCount( 124 LoadingCache<Class<T>, AtomicInteger> usageCounts, 125 Class<? extends T> codecClass) { 126 return usageCounts.getUnchecked((Class<T>) codecClass).get(); 127 } 128 129 private static <T> void updateLeaseCount( 130 LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) { 131 if (codec != null) { 132 Class<T> codecClass = ReflectionUtils.getClass(codec); 133 usageCounts.getUnchecked(codecClass).addAndGet(delta); 134 } 135 } 136 137 /** 138 * Get a {@link Compressor} for the given {@link CompressionCodec} from the 139 * pool or a new one. 140 * 141 * @param codec the <code>CompressionCodec</code> for which to get the 142 * <code>Compressor</code> 143 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor 144 * @return <code>Compressor</code> for the given 145 * <code>CompressionCodec</code> from the pool or a new one 146 */ 147 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 148 Compressor compressor = borrow(compressorPool, codec.getCompressorType()); 149 if (compressor == null) { 150 compressor = codec.createCompressor(); 151 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); 152 } else { 153 compressor.reinit(conf); 154 if(LOG.isDebugEnabled()) { 155 LOG.debug("Got recycled compressor"); 156 } 157 } 158 updateLeaseCount(compressorCounts, compressor, 1); 159 return compressor; 160 } 161 162 public static Compressor getCompressor(CompressionCodec codec) { 163 return getCompressor(codec, null); 164 } 165 166 /** 167 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the 168 * pool or a new one. 169 * 170 * @param codec the <code>CompressionCodec</code> for which to get the 171 * <code>Decompressor</code> 172 * @return <code>Decompressor</code> for the given 173 * <code>CompressionCodec</code> the pool or a new one 174 */ 175 public static Decompressor getDecompressor(CompressionCodec codec) { 176 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType()); 177 if (decompressor == null) { 178 decompressor = codec.createDecompressor(); 179 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); 180 } else { 181 if(LOG.isDebugEnabled()) { 182 LOG.debug("Got recycled decompressor"); 183 } 184 } 185 updateLeaseCount(decompressorCounts, decompressor, 1); 186 return decompressor; 187 } 188 189 /** 190 * Return the {@link Compressor} to the pool. 191 * 192 * @param compressor the <code>Compressor</code> to be returned to the pool 193 */ 194 public static void returnCompressor(Compressor compressor) { 195 if (compressor == null) { 196 return; 197 } 198 // if the compressor can't be reused, don't pool it. 199 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 200 return; 201 } 202 compressor.reset(); 203 payback(compressorPool, compressor); 204 updateLeaseCount(compressorCounts, compressor, -1); 205 } 206 207 /** 208 * Return the {@link Decompressor} to the pool. 209 * 210 * @param decompressor the <code>Decompressor</code> to be returned to the 211 * pool 212 */ 213 public static void returnDecompressor(Decompressor decompressor) { 214 if (decompressor == null) { 215 return; 216 } 217 // if the decompressor can't be reused, don't pool it. 218 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 219 return; 220 } 221 decompressor.reset(); 222 payback(decompressorPool, decompressor); 223 updateLeaseCount(decompressorCounts, decompressor, -1); 224 } 225 226 /** 227 * Return the number of leased {@link Compressor}s for this 228 * {@link CompressionCodec} 229 */ 230 public static int getLeasedCompressorsCount(CompressionCodec codec) { 231 return (codec == null) ? 0 : getLeaseCount(compressorCounts, 232 codec.getCompressorType()); 233 } 234 235 /** 236 * Return the number of leased {@link Decompressor}s for this 237 * {@link CompressionCodec} 238 */ 239 public static int getLeasedDecompressorsCount(CompressionCodec codec) { 240 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, 241 codec.getDecompressorType()); 242 } 243}