001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.IOException;
022import java.io.OutputStream;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026
027/**
028 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
029 * with 'block-based' based compression algorithms, as opposed to 
030 * 'stream-based' compression algorithms.
031 *
032 * It should be noted that this wrapper does not guarantee that blocks will
033 * be sized for the compressor. If the
034 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
035 * effect meaningful compression, it is responsible for it.
036 */
037@InterfaceAudience.Public
038@InterfaceStability.Evolving
039public class BlockCompressorStream extends CompressorStream {
040
041  // The 'maximum' size of input data to be compressed, to account
042  // for the overhead of the compression algorithm.
043  private final int MAX_INPUT_SIZE;
044
045  /**
046   * Create a {@link BlockCompressorStream}.
047   * 
048   * @param out stream
049   * @param compressor compressor to be used
050   * @param bufferSize size of buffer
051   * @param compressionOverhead maximum 'overhead' of the compression 
052   *                            algorithm with given bufferSize
053   */
054  public BlockCompressorStream(OutputStream out, Compressor compressor, 
055                               int bufferSize, int compressionOverhead) {
056    super(out, compressor, bufferSize);
057    MAX_INPUT_SIZE = bufferSize - compressionOverhead;
058  }
059
060  /**
061   * Create a {@link BlockCompressorStream} with given output-stream and 
062   * compressor.
063   * Use default of 512 as bufferSize and compressionOverhead of 
064   * (1% of bufferSize + 12 bytes) =  18 bytes (zlib algorithm).
065   * 
066   * @param out stream
067   * @param compressor compressor to be used
068   */
069  public BlockCompressorStream(OutputStream out, Compressor compressor) {
070    this(out, compressor, 512, 18);
071  }
072
073  /**
074   * Write the data provided to the compression codec, compressing no more
075   * than the buffer size less the compression overhead as specified during
076   * construction for each block.
077   *
078   * Each block contains the uncompressed length for the block, followed by
079   * one or more length-prefixed blocks of compressed data.
080   */
081  public void write(byte[] b, int off, int len) throws IOException {
082    // Sanity checks
083    if (compressor.finished()) {
084      throw new IOException("write beyond end of stream");
085    }
086    if (b == null) {
087      throw new NullPointerException();
088    } else if ((off < 0) || (off > b.length) || (len < 0) ||
089               ((off + len) > b.length)) {
090      throw new IndexOutOfBoundsException();
091    } else if (len == 0) {
092      return;
093    }
094
095    long limlen = compressor.getBytesRead();
096    if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
097      // Adding this segment would exceed the maximum size.
098      // Flush data if we have it.
099      finish();
100      compressor.reset();
101    }
102
103    if (len > MAX_INPUT_SIZE) {
104      // The data we're given exceeds the maximum size. Any data
105      // we had have been flushed, so we write out this chunk in segments
106      // not exceeding the maximum size until it is exhausted.
107      rawWriteInt(len);
108      do {
109        int bufLen = Math.min(len, MAX_INPUT_SIZE);
110        
111        compressor.setInput(b, off, bufLen);
112        compressor.finish();
113        while (!compressor.finished()) {
114          compress();
115        }
116        compressor.reset();
117        off += bufLen;
118        len -= bufLen;
119      } while (len > 0);
120      return;
121    }
122
123    // Give data to the compressor
124    compressor.setInput(b, off, len);
125    if (!compressor.needsInput()) {
126      // compressor buffer size might be smaller than the maximum
127      // size, so we permit it to flush if required.
128      rawWriteInt((int)compressor.getBytesRead());
129      do {
130        compress();
131      } while (!compressor.needsInput());
132    }
133  }
134
135  public void finish() throws IOException {
136    if (!compressor.finished()) {
137      rawWriteInt((int)compressor.getBytesRead());
138      compressor.finish();
139      while (!compressor.finished()) {
140        compress();
141      }
142    }
143  }
144
145  protected void compress() throws IOException {
146    int len = compressor.compress(buffer, 0, buffer.length);
147    if (len > 0) {
148      // Write out the compressed chunk
149      rawWriteInt(len);
150      out.write(buffer, 0, len);
151    }
152  }
153  
154  private void rawWriteInt(int v) throws IOException {
155    out.write((v >>> 24) & 0xFF);
156    out.write((v >>> 16) & 0xFF);
157    out.write((v >>>  8) & 0xFF);
158    out.write((v >>>  0) & 0xFF);
159  }
160
161}