001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.IOException;
022import java.io.OutputStream;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026
027/**
028 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
029 * with 'block-based' based compression algorithms, as opposed to 
030 * 'stream-based' compression algorithms.
031 *
032 * It should be noted that this wrapper does not guarantee that blocks will
033 * be sized for the compressor. If the
034 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
035 * effect meaningful compression, it is responsible for it.
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Evolving
039public class BlockCompressorStream extends CompressorStream {
040
041  // The 'maximum' size of input data to be compressed, to account
042  // for the overhead of the compression algorithm.
043  private final int MAX_INPUT_SIZE;
044
045  /**
046   * Create a {@link BlockCompressorStream}.
047   * 
048   * @param out stream
049   * @param compressor compressor to be used
050   * @param bufferSize size of buffer
051   * @param compressionOverhead maximum 'overhead' of the compression 
052   *                            algorithm with given bufferSize
053   */
054  public BlockCompressorStream(OutputStream out, Compressor compressor, 
055                               int bufferSize, int compressionOverhead) {
056    super(out, compressor, bufferSize);
057    MAX_INPUT_SIZE = bufferSize - compressionOverhead;
058  }
059
060  /**
061   * Create a {@link BlockCompressorStream} with given output-stream and 
062   * compressor.
063   * Use default of 512 as bufferSize and compressionOverhead of 
064   * (1% of bufferSize + 12 bytes) =  18 bytes (zlib algorithm).
065   * 
066   * @param out stream
067   * @param compressor compressor to be used
068   */
069  public BlockCompressorStream(OutputStream out, Compressor compressor) {
070    this(out, compressor, 512, 18);
071  }
072
073  /**
074   * Write the data provided to the compression codec, compressing no more
075   * than the buffer size less the compression overhead as specified during
076   * construction for each block.
077   *
078   * Each block contains the uncompressed length for the block, followed by
079   * one or more length-prefixed blocks of compressed data.
080   */
081  @Override
082  public void write(byte[] b, int off, int len) throws IOException {
083    // Sanity checks
084    if (compressor.finished()) {
085      throw new IOException("write beyond end of stream");
086    }
087    if (b == null) {
088      throw new NullPointerException();
089    } else if ((off < 0) || (off > b.length) || (len < 0) ||
090               ((off + len) > b.length)) {
091      throw new IndexOutOfBoundsException();
092    } else if (len == 0) {
093      return;
094    }
095
096    long limlen = compressor.getBytesRead();
097    if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
098      // Adding this segment would exceed the maximum size.
099      // Flush data if we have it.
100      finish();
101      compressor.reset();
102    }
103
104    if (len > MAX_INPUT_SIZE) {
105      // The data we're given exceeds the maximum size. Any data
106      // we had have been flushed, so we write out this chunk in segments
107      // not exceeding the maximum size until it is exhausted.
108      rawWriteInt(len);
109      do {
110        int bufLen = Math.min(len, MAX_INPUT_SIZE);
111        
112        compressor.setInput(b, off, bufLen);
113        compressor.finish();
114        while (!compressor.finished()) {
115          compress();
116        }
117        compressor.reset();
118        off += bufLen;
119        len -= bufLen;
120      } while (len > 0);
121      return;
122    }
123
124    // Give data to the compressor
125    compressor.setInput(b, off, len);
126    if (!compressor.needsInput()) {
127      // compressor buffer size might be smaller than the maximum
128      // size, so we permit it to flush if required.
129      rawWriteInt((int)compressor.getBytesRead());
130      do {
131        compress();
132      } while (!compressor.needsInput());
133    }
134  }
135
136  @Override
137  public void finish() throws IOException {
138    if (!compressor.finished()) {
139      rawWriteInt((int)compressor.getBytesRead());
140      compressor.finish();
141      while (!compressor.finished()) {
142        compress();
143      }
144    }
145  }
146
147  @Override
148  protected void compress() throws IOException {
149    int len = compressor.compress(buffer, 0, buffer.length);
150    if (len > 0) {
151      // Write out the compressed chunk
152      rawWriteInt(len);
153      out.write(buffer, 0, len);
154    }
155  }
156  
157  private void rawWriteInt(int v) throws IOException {
158    out.write((v >>> 24) & 0xFF);
159    out.write((v >>> 16) & 0xFF);
160    out.write((v >>>  8) & 0xFF);
161    out.write((v >>>  0) & 0xFF);
162  }
163
164}