001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.*;
022import java.util.zip.GZIPOutputStream;
023import org.apache.hadoop.classification.InterfaceAudience;
024import org.apache.hadoop.classification.InterfaceStability;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.io.compress.DefaultCodec;
027import org.apache.hadoop.io.compress.zlib.*;
028import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
029
030import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
031import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
032import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
033
034/**
035 * This class creates gzip compressors/decompressors. 
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Evolving
039public class GzipCodec extends DefaultCodec {
040  /**
041   * A bridge that wraps around a DeflaterOutputStream to make it 
042   * a CompressionOutputStream.
043   */
044  @InterfaceStability.Evolving
045  protected static class GzipOutputStream extends CompressorStream {
046
047    private static class ResetableGZIPOutputStream extends GZIPOutputStream {
048      private static final int TRAILER_SIZE = 8;
049      public static final String JVMVersion= System.getProperty("java.version");
050      private static final boolean HAS_BROKEN_FINISH =
051          (IBM_JAVA && JVMVersion.contains("1.6.0"));
052
053      public ResetableGZIPOutputStream(OutputStream out) throws IOException {
054        super(out);
055      }
056      
057      public void resetState() throws IOException {
058        def.reset();
059      }
060    }
061
062    public GzipOutputStream(OutputStream out) throws IOException {
063      super(new ResetableGZIPOutputStream(out));
064    }
065    
066    /**
067     * Allow children types to put a different type in here.
068     * @param out the Deflater stream to use
069     */
070    protected GzipOutputStream(CompressorStream out) {
071      super(out);
072    }
073    
074    @Override
075    public void close() throws IOException {
076      out.close();
077    }
078    
079    @Override
080    public void flush() throws IOException {
081      out.flush();
082    }
083    
084    @Override
085    public void write(int b) throws IOException {
086      out.write(b);
087    }
088    
089    @Override
090    public void write(byte[] data, int offset, int length) 
091      throws IOException {
092      out.write(data, offset, length);
093    }
094    
095    @Override
096    public void finish() throws IOException {
097      ((ResetableGZIPOutputStream) out).finish();
098    }
099
100    @Override
101    public void resetState() throws IOException {
102      ((ResetableGZIPOutputStream) out).resetState();
103    }
104  }
105
106  @Override
107  public CompressionOutputStream createOutputStream(OutputStream out) 
108    throws IOException {
109    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
110      return new GzipOutputStream(out);
111    }
112    return CompressionCodec.Util.
113        createOutputStreamWithCodecPool(this, conf, out);
114  }
115  
116  @Override
117  public CompressionOutputStream createOutputStream(OutputStream out, 
118                                                    Compressor compressor) 
119  throws IOException {
120    return (compressor != null) ?
121               new CompressorStream(out, compressor,
122                                    conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
123                                            IO_FILE_BUFFER_SIZE_DEFAULT)) :
124               createOutputStream(out);
125  }
126
127  @Override
128  public Compressor createCompressor() {
129    return (ZlibFactory.isNativeZlibLoaded(conf))
130      ? new GzipZlibCompressor(conf)
131      : null;
132  }
133
134  @Override
135  public Class<? extends Compressor> getCompressorType() {
136    return ZlibFactory.isNativeZlibLoaded(conf)
137      ? GzipZlibCompressor.class
138      : null;
139  }
140
141  @Override
142  public CompressionInputStream createInputStream(InputStream in)
143      throws IOException {
144    return CompressionCodec.Util.
145        createInputStreamWithCodecPool(this, conf, in);
146  }
147
148  @Override
149  public CompressionInputStream createInputStream(InputStream in,
150                                                  Decompressor decompressor)
151  throws IOException {
152    if (decompressor == null) {
153      decompressor = createDecompressor();  // always succeeds (or throws)
154    }
155    return new DecompressorStream(in, decompressor,
156                                  conf.getInt(IO_FILE_BUFFER_SIZE_KEY,
157                                      IO_FILE_BUFFER_SIZE_DEFAULT));
158  }
159
160  @Override
161  public Decompressor createDecompressor() {
162    return (ZlibFactory.isNativeZlibLoaded(conf))
163      ? new GzipZlibDecompressor()
164      : new BuiltInGzipDecompressor();
165  }
166
167  @Override
168  public Class<? extends Decompressor> getDecompressorType() {
169    return ZlibFactory.isNativeZlibLoaded(conf)
170      ? GzipZlibDecompressor.class
171      : BuiltInGzipDecompressor.class;
172  }
173    
174  @Override
175  public DirectDecompressor createDirectDecompressor() {
176    return ZlibFactory.isNativeZlibLoaded(conf) 
177        ? new ZlibDecompressor.ZlibDirectDecompressor(
178          ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
179  }
180
181  @Override
182  public String getDefaultExtension() {
183    return ".gz";
184  }
185
186  static final class GzipZlibCompressor extends ZlibCompressor {
187    public GzipZlibCompressor() {
188      super(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
189          ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
190          ZlibCompressor.CompressionHeader.GZIP_FORMAT, 64*1024);
191    }
192    
193    public GzipZlibCompressor(Configuration conf) {
194      super(ZlibFactory.getCompressionLevel(conf),
195           ZlibFactory.getCompressionStrategy(conf),
196           ZlibCompressor.CompressionHeader.GZIP_FORMAT,
197           64 * 1024);
198    }
199  }
200
201  static final class GzipZlibDecompressor extends ZlibDecompressor {
202    public GzipZlibDecompressor() {
203      super(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 64*1024);
204    }
205  }
206
207}