001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.io.compress; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027import org.apache.hadoop.classification.InterfaceAudience; 028import org.apache.hadoop.classification.InterfaceStability; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.util.ReflectionUtils; 031 032/** 033 * A global compressor/decompressor pool used to save and reuse 034 * (possibly native) compression/decompression codecs. 035 */ 036@InterfaceAudience.Public 037@InterfaceStability.Evolving 038public class CodecPool { 039 private static final Log LOG = LogFactory.getLog(CodecPool.class); 040 041 /** 042 * A global compressor pool used to save the expensive 043 * construction/destruction of (possibly native) decompression codecs. 044 */ 045 private static final Map<Class<Compressor>, List<Compressor>> compressorPool = 046 new HashMap<Class<Compressor>, List<Compressor>>(); 047 048 /** 049 * A global decompressor pool used to save the expensive 050 * construction/destruction of (possibly native) decompression codecs. 051 */ 052 private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool = 053 new HashMap<Class<Decompressor>, List<Decompressor>>(); 054 055 private static <T> T borrow(Map<Class<T>, List<T>> pool, 056 Class<? extends T> codecClass) { 057 T codec = null; 058 059 // Check if an appropriate codec is available 060 synchronized (pool) { 061 if (pool.containsKey(codecClass)) { 062 List<T> codecList = pool.get(codecClass); 063 064 if (codecList != null) { 065 synchronized (codecList) { 066 if (!codecList.isEmpty()) { 067 codec = codecList.remove(codecList.size()-1); 068 } 069 } 070 } 071 } 072 } 073 074 return codec; 075 } 076 077 private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { 078 if (codec != null) { 079 Class<T> codecClass = ReflectionUtils.getClass(codec); 080 synchronized (pool) { 081 if (!pool.containsKey(codecClass)) { 082 pool.put(codecClass, new ArrayList<T>()); 083 } 084 085 List<T> codecList = pool.get(codecClass); 086 synchronized (codecList) { 087 codecList.add(codec); 088 } 089 } 090 } 091 } 092 093 /** 094 * Get a {@link Compressor} for the given {@link CompressionCodec} from the 095 * pool or a new one. 096 * 097 * @param codec the <code>CompressionCodec</code> for which to get the 098 * <code>Compressor</code> 099 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor 100 * @return <code>Compressor</code> for the given 101 * <code>CompressionCodec</code> from the pool or a new one 102 */ 103 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { 104 Compressor compressor = borrow(compressorPool, codec.getCompressorType()); 105 if (compressor == null) { 106 compressor = codec.createCompressor(); 107 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); 108 } else { 109 compressor.reinit(conf); 110 if(LOG.isDebugEnabled()) { 111 LOG.debug("Got recycled compressor"); 112 } 113 } 114 return compressor; 115 } 116 117 public static Compressor getCompressor(CompressionCodec codec) { 118 return getCompressor(codec, null); 119 } 120 121 /** 122 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the 123 * pool or a new one. 124 * 125 * @param codec the <code>CompressionCodec</code> for which to get the 126 * <code>Decompressor</code> 127 * @return <code>Decompressor</code> for the given 128 * <code>CompressionCodec</code> the pool or a new one 129 */ 130 public static Decompressor getDecompressor(CompressionCodec codec) { 131 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType()); 132 if (decompressor == null) { 133 decompressor = codec.createDecompressor(); 134 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); 135 } else { 136 if(LOG.isDebugEnabled()) { 137 LOG.debug("Got recycled decompressor"); 138 } 139 } 140 return decompressor; 141 } 142 143 /** 144 * Return the {@link Compressor} to the pool. 145 * 146 * @param compressor the <code>Compressor</code> to be returned to the pool 147 */ 148 public static void returnCompressor(Compressor compressor) { 149 if (compressor == null) { 150 return; 151 } 152 // if the compressor can't be reused, don't pool it. 153 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { 154 return; 155 } 156 compressor.reset(); 157 payback(compressorPool, compressor); 158 } 159 160 /** 161 * Return the {@link Decompressor} to the pool. 162 * 163 * @param decompressor the <code>Decompressor</code> to be returned to the 164 * pool 165 */ 166 public static void returnDecompressor(Decompressor decompressor) { 167 if (decompressor == null) { 168 return; 169 } 170 // if the decompressor can't be reused, don't pool it. 171 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { 172 return; 173 } 174 decompressor.reset(); 175 payback(decompressorPool, decompressor); 176 } 177}