001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.*;
022    import java.util.zip.GZIPOutputStream;
023    import org.apache.hadoop.classification.InterfaceAudience;
024    import org.apache.hadoop.classification.InterfaceStability;
025    import org.apache.hadoop.conf.Configuration;
026    import org.apache.hadoop.io.compress.DefaultCodec;
027    import org.apache.hadoop.io.compress.zlib.*;
028    import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
029    
030    import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
031    
032    /**
033     * This class creates gzip compressors/decompressors. 
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public class GzipCodec extends DefaultCodec {
038      /**
039       * A bridge that wraps around a DeflaterOutputStream to make it 
040       * a CompressionOutputStream.
041       */
042      @InterfaceStability.Evolving
043      protected static class GzipOutputStream extends CompressorStream {
044    
045        private static class ResetableGZIPOutputStream extends GZIPOutputStream {
046          private static final int TRAILER_SIZE = 8;
047          public static final String JVMVersion= System.getProperty("java.version");
048          private static final boolean HAS_BROKEN_FINISH =
049              (IBM_JAVA && JVMVersion.contains("1.6.0"));
050    
051          public ResetableGZIPOutputStream(OutputStream out) throws IOException {
052            super(out);
053          }
054          
055          public void resetState() throws IOException {
056            def.reset();
057          }
058        }
059    
060        public GzipOutputStream(OutputStream out) throws IOException {
061          super(new ResetableGZIPOutputStream(out));
062        }
063        
064        /**
065         * Allow children types to put a different type in here.
066         * @param out the Deflater stream to use
067         */
068        protected GzipOutputStream(CompressorStream out) {
069          super(out);
070        }
071        
072        @Override
073        public void close() throws IOException {
074          out.close();
075        }
076        
077        @Override
078        public void flush() throws IOException {
079          out.flush();
080        }
081        
082        @Override
083        public void write(int b) throws IOException {
084          out.write(b);
085        }
086        
087        @Override
088        public void write(byte[] data, int offset, int length) 
089          throws IOException {
090          out.write(data, offset, length);
091        }
092        
093        @Override
094        public void finish() throws IOException {
095          ((ResetableGZIPOutputStream) out).finish();
096        }
097    
098        @Override
099        public void resetState() throws IOException {
100          ((ResetableGZIPOutputStream) out).resetState();
101        }
102      }
103    
104      @Override
105      public CompressionOutputStream createOutputStream(OutputStream out) 
106        throws IOException {
107        return (ZlibFactory.isNativeZlibLoaded(conf)) ?
108                   new CompressorStream(out, createCompressor(),
109                                        conf.getInt("io.file.buffer.size", 4*1024)) :
110                   new GzipOutputStream(out);
111      }
112      
113      @Override
114      public CompressionOutputStream createOutputStream(OutputStream out, 
115                                                        Compressor compressor) 
116      throws IOException {
117        return (compressor != null) ?
118                   new CompressorStream(out, compressor,
119                                        conf.getInt("io.file.buffer.size", 
120                                                    4*1024)) :
121                   createOutputStream(out);
122      }
123    
124      @Override
125      public Compressor createCompressor() {
126        return (ZlibFactory.isNativeZlibLoaded(conf))
127          ? new GzipZlibCompressor(conf)
128          : null;
129      }
130    
131      @Override
132      public Class<? extends Compressor> getCompressorType() {
133        return ZlibFactory.isNativeZlibLoaded(conf)
134          ? GzipZlibCompressor.class
135          : null;
136      }
137    
138      @Override
139      public CompressionInputStream createInputStream(InputStream in)
140      throws IOException {
141        return createInputStream(in, null);
142      }
143    
144      @Override
145      public CompressionInputStream createInputStream(InputStream in,
146                                                      Decompressor decompressor)
147      throws IOException {
148        if (decompressor == null) {
149          decompressor = createDecompressor();  // always succeeds (or throws)
150        }
151        return new DecompressorStream(in, decompressor,
152                                      conf.getInt("io.file.buffer.size", 4*1024));
153      }
154    
155      @Override
156      public Decompressor createDecompressor() {
157        return (ZlibFactory.isNativeZlibLoaded(conf))
158          ? new GzipZlibDecompressor()
159          : new BuiltInGzipDecompressor();
160      }
161    
162      @Override
163      public Class<? extends Decompressor> getDecompressorType() {
164        return ZlibFactory.isNativeZlibLoaded(conf)
165          ? GzipZlibDecompressor.class
166          : BuiltInGzipDecompressor.class;
167      }
168        
169      @Override
170      public DirectDecompressor createDirectDecompressor() {
171        return ZlibFactory.isNativeZlibLoaded(conf) 
172            ? new ZlibDecompressor.ZlibDirectDecompressor(
173              ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
174      }
175    
176      @Override
177      public String getDefaultExtension() {
178        return ".gz";
179      }
180    
181      static final class GzipZlibCompressor extends ZlibCompressor {
182        public GzipZlibCompressor() {
183          super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
184              ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
185              ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
186        }
187        
188        public GzipZlibCompressor(Configuration conf) {
189          super(ZlibFactory.getCompressionLevel(conf),
190               ZlibFactory.getCompressionStrategy(conf),
191               ZlibCompressor.CompressionHeader.GZIP_FORMAT,
192               64 * 1024);
193        }
194      }
195    
196      static final class GzipZlibDecompressor extends ZlibDecompressor {
197        public GzipZlibDecompressor() {
198          super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024);
199        }
200      }
201    
202    }