001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.*;
022    import java.util.zip.GZIPOutputStream;
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.compress.DefaultCodec;
027    import org.apache.hadoop.io.compress.zlib.*;
028    import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
029    
030    import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
031    
032    /**
033     * This class creates gzip compressors/decompressors. 
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public class GzipCodec extends DefaultCodec {
038      /**
039       * A bridge that wraps around a DeflaterOutputStream to make it 
040       * a CompressionOutputStream.
041       */
042      @InterfaceStability.Evolving
043      protected static class GzipOutputStream extends CompressorStream {
044    
045        private static class ResetableGZIPOutputStream extends GZIPOutputStream {
046          private static final int TRAILER_SIZE = 8;
047          public static final String JVMVersion= System.getProperty("java.version");
048          private static final boolean HAS_BROKEN_FINISH =
049              (IBM_JAVA && JVMVersion.contains("1.6.0"));
050    
051          public ResetableGZIPOutputStream(OutputStream out) throws IOException {
052            super(out);
053          }
054          
055          public void resetState() throws IOException {
056            def.reset();
057          }
058        }
059    
060        public GzipOutputStream(OutputStream out) throws IOException {
061          super(new ResetableGZIPOutputStream(out));
062        }
063        
064        /**
065         * Allow children types to put a different type in here.
066         * @param out the Deflater stream to use
067         */
068        protected GzipOutputStream(CompressorStream out) {
069          super(out);
070        }
071        
072        @Override
073        public void close() throws IOException {
074          out.close();
075        }
076        
077        @Override
078        public void flush() throws IOException {
079          out.flush();
080        }
081        
082        @Override
083        public void write(int b) throws IOException {
084          out.write(b);
085        }
086        
087        @Override
088        public void write(byte[] data, int offset, int length) 
089          throws IOException {
090          out.write(data, offset, length);
091        }
092        
093        @Override
094        public void finish() throws IOException {
095          ((ResetableGZIPOutputStream) out).finish();
096        }
097    
098        @Override
099        public void resetState() throws IOException {
100          ((ResetableGZIPOutputStream) out).resetState();
101        }
102      }
103    
104      @Override
105      public CompressionOutputStream createOutputStream(OutputStream out) 
106        throws IOException {
107        if (!ZlibFactory.isNativeZlibLoaded(conf)) {
108          return new GzipOutputStream(out);
109        }
110        return CompressionCodec.Util.
111            createOutputStreamWithCodecPool(this, conf, out);
112      }
113      
114      @Override
115      public CompressionOutputStream createOutputStream(OutputStream out, 
116                                                        Compressor compressor) 
117      throws IOException {
118        return (compressor != null) ?
119                   new CompressorStream(out, compressor,
120                                        conf.getInt("io.file.buffer.size", 
121                                                    4*1024)) :
122                   createOutputStream(out);
123      }
124    
125      @Override
126      public Compressor createCompressor() {
127        return (ZlibFactory.isNativeZlibLoaded(conf))
128          ? new GzipZlibCompressor(conf)
129          : null;
130      }
131    
132      @Override
133      public Class<? extends Compressor> getCompressorType() {
134        return ZlibFactory.isNativeZlibLoaded(conf)
135          ? GzipZlibCompressor.class
136          : null;
137      }
138    
139      @Override
140      public CompressionInputStream createInputStream(InputStream in)
141          throws IOException {
142        return CompressionCodec.Util.
143            createInputStreamWithCodecPool(this, conf, in);
144      }
145    
146      @Override
147      public CompressionInputStream createInputStream(InputStream in,
148                                                      Decompressor decompressor)
149      throws IOException {
150        if (decompressor == null) {
151          decompressor = createDecompressor();  // always succeeds (or throws)
152        }
153        return new DecompressorStream(in, decompressor,
154                                      conf.getInt("io.file.buffer.size", 4*1024));
155      }
156    
157      @Override
158      public Decompressor createDecompressor() {
159        return (ZlibFactory.isNativeZlibLoaded(conf))
160          ? new GzipZlibDecompressor()
161          : new BuiltInGzipDecompressor();
162      }
163    
164      @Override
165      public Class<? extends Decompressor> getDecompressorType() {
166        return ZlibFactory.isNativeZlibLoaded(conf)
167          ? GzipZlibDecompressor.class
168          : BuiltInGzipDecompressor.class;
169      }
170        
171      @Override
172      public DirectDecompressor createDirectDecompressor() {
173        return ZlibFactory.isNativeZlibLoaded(conf) 
174            ? new ZlibDecompressor.ZlibDirectDecompressor(
175              ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
176      }
177    
178      @Override
179      public String getDefaultExtension() {
180        return ".gz";
181      }
182    
183      static final class GzipZlibCompressor extends ZlibCompressor {
184        public GzipZlibCompressor() {
185          super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
186              ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
187              ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
188        }
189        
190        public GzipZlibCompressor(Configuration conf) {
191          super(ZlibFactory.getCompressionLevel(conf),
192               ZlibFactory.getCompressionStrategy(conf),
193               ZlibCompressor.CompressionHeader.GZIP_FORMAT,
194               64 * 1024);
195        }
196      }
197    
198      static final class GzipZlibDecompressor extends ZlibDecompressor {
199        public GzipZlibDecompressor() {
200          super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024);
201        }
202      }
203    
204    }