001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io.compress; 019 020import java.util.HashSet; 021import java.util.HashMap; 022import java.util.Set; 023import java.util.Map; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.util.ReflectionUtils; 032 033import com.google.common.cache.CacheBuilder; 034import com.google.common.cache.CacheLoader; 035import com.google.common.cache.LoadingCache; 036 037/** 038 * A global compressor/decompressor pool used to save and reuse 039 * (possibly native) compression/decompression codecs. 040 */ 041@InterfaceAudience.Public 042@InterfaceStability.Evolving 043public class CodecPool { 044 private static final Log LOG = LogFactory.getLog(CodecPool.class); 045 046 /** 047 * A global compressor pool used to save the expensive 048 * construction/destruction of (possibly native) decompression codecs. 049 */ 050 private static final Map<Class<Compressor>, Set<Compressor>> compressorPool = 051 new HashMap<Class<Compressor>, Set<Compressor>>(); 052 053 /** 054 * A global decompressor pool used to save the expensive 055 * construction/destruction of (possibly native) decompression codecs. 056 */ 057 private static final Map<Class<Decompressor>, Set<Decompressor>> decompressorPool = 058 new HashMap<Class<Decompressor>, Set<Decompressor>>(); 059 060 private static <T> LoadingCache<Class<T>, AtomicInteger> createCache( 061 Class<T> klass) { 062 return CacheBuilder.newBuilder().build( 063 new CacheLoader<Class<T>, AtomicInteger>() { 064 @Override 065 public AtomicInteger load(Class<T> key) throws Exception { 066 return new AtomicInteger(); 067 } 068 }); 069 } 070 071 /** 072 * Map to track the number of leased compressors 073 */ 074 private static final LoadingCache<Class<Compressor>, AtomicInteger> compressorCounts = 075 createCache(Compressor.class); 076 077 /** 078 * Map to tracks the number of leased decompressors 079 */ 080 private static final LoadingCache<Class<Decompressor>, AtomicInteger> decompressorCounts = 081 createCache(Decompressor.class); 082 083 private static <T> T borrow(Map<Class<T>, Set<T>> pool, 084 Class<? extends T> codecClass) { 085 T codec = null; 086 087 // Check if an appropriate codec is available 088 Set<T> codecSet; 089 synchronized (pool) { 090 codecSet = pool.get(codecClass); 091 } 092 093 if (codecSet != null) { 094 synchronized (codecSet) { 095 if (!codecSet.isEmpty()) { 096 codec = codecSet.iterator().next(); 097 codecSet.remove(codec); 098 } 099 } 100 } 101 102 return codec; 103 } 104 105 private static <T> boolean payback(Map<Class<T>, Set<T>> pool, T codec) { 106 if (codec != null) { 107 Class<T> codecClass = ReflectionUtils.getClass(codec); 108 Set<T> codecSet; 109 synchronized (pool) { 110 codecSet = pool.get(codecClass); 111 if (codecSet == null) { 112 codecSet = new HashSet<T>(); 113 pool.put(codecClass, codecSet); 114 } 115 } 116 117 synchronized (codecSet) { 118 return codecSet.add(codec); 119 } 120 } 121 return false; 122 } 123 124 @SuppressWarnings("unchecked") 125 private static <T> int getLeaseCount( 126 LoadingCache<Class<T>, AtomicInteger> usageCounts, 127 Class<? extends T> codecClass) { 128 return usageCounts.getUnchecked((Class<T>) codecClass).get(); 129 } 130 131 private static <T> void updateLeaseCount( 132 LoadingCache<Class<T>, AtomicInteger> usageCounts, T codec, int delta) { 133 if (codec != null) { 134 Class<T> codecClass = ReflectionUtils.getClass(codec); 135 usageCounts.getUnchecked(codecClass).addAndGet(delta); 136 } 137 } 138 139 /** 140 * Get a {@link Compressor} for the given {@link CompressionCodec} from the 141 * pool or a new one. 142 * 143 * @param codec the <code>CompressionCodec</code> for which to get the 144 * <code>Compressor</code> 145 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor 146 * @return <code>Compressor</code> for the given 147 * <code>CompressionCodec</code> from the pool or a new one 148 */ 149 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 150 Compressor compressor = borrow(compressorPool, codec.getCompressorType()); 151 if (compressor == null) { 152 compressor = codec.createCompressor(); 153 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); 154 } else { 155 compressor.reinit(conf); 156 if(LOG.isDebugEnabled()) { 157 LOG.debug("Got recycled compressor"); 158 } 159 } 160 if (compressor != null && 161 !compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 162 updateLeaseCount(compressorCounts, compressor, 1); 163 } 164 return compressor; 165 } 166 167 public static Compressor getCompressor(CompressionCodec codec) { 168 return getCompressor(codec, null); 169 } 170 171 /** 172 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the 173 * pool or a new one. 174 * 175 * @param codec the <code>CompressionCodec</code> for which to get the 176 * <code>Decompressor</code> 177 * @return <code>Decompressor</code> for the given 178 * <code>CompressionCodec</code> the pool or a new one 179 */ 180 public static Decompressor getDecompressor(CompressionCodec codec) { 181 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType()); 182 if (decompressor == null) { 183 decompressor = codec.createDecompressor(); 184 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); 185 } else { 186 if(LOG.isDebugEnabled()) { 187 LOG.debug("Got recycled decompressor"); 188 } 189 } 190 if (decompressor != null && 191 !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 192 updateLeaseCount(decompressorCounts, decompressor, 1); 193 } 194 return decompressor; 195 } 196 197 /** 198 * Return the {@link Compressor} to the pool. 199 * 200 * @param compressor the <code>Compressor</code> to be returned to the pool 201 */ 202 public static void returnCompressor(Compressor compressor) { 203 if (compressor == null) { 204 return; 205 } 206 // if the compressor can't be reused, don't pool it. 207 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 208 return; 209 } 210 compressor.reset(); 211 if (payback(compressorPool, compressor)) { 212 updateLeaseCount(compressorCounts, compressor, -1); 213 } 214 } 215 216 /** 217 * Return the {@link Decompressor} to the pool. 218 * 219 * @param decompressor the <code>Decompressor</code> to be returned to the 220 * pool 221 */ 222 public static void returnDecompressor(Decompressor decompressor) { 223 if (decompressor == null) { 224 return; 225 } 226 // if the decompressor can't be reused, don't pool it. 227 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 228 return; 229 } 230 decompressor.reset(); 231 if (payback(decompressorPool, decompressor)) { 232 updateLeaseCount(decompressorCounts, decompressor, -1); 233 } 234 } 235 236 /** 237 * Return the number of leased {@link Compressor}s for this 238 * {@link CompressionCodec} 239 */ 240 public static int getLeasedCompressorsCount(CompressionCodec codec) { 241 return (codec == null) ? 0 : getLeaseCount(compressorCounts, 242 codec.getCompressorType()); 243 } 244 245 /** 246 * Return the number of leased {@link Decompressor}s for this 247 * {@link CompressionCodec} 248 */ 249 public static int getLeasedDecompressorsCount(CompressionCodec codec) { 250 return (codec == null) ? 0 : getLeaseCount(decompressorCounts, 251 codec.getDecompressorType()); 252 } 253}