001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027
028/**
029 * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
030 * with 'block-based' based compression algorithms, as opposed to 
031 * 'stream-based' compression algorithms.
032 *  
033 */
034@InterfaceAudience.Public
035@InterfaceStability.Evolving
036public class BlockDecompressorStream extends DecompressorStream {
037  private int originalBlockSize = 0;
038  private int noUncompressedBytes = 0;
039
040  /**
041   * Create a {@link BlockDecompressorStream}.
042   * 
043   * @param in input stream
044   * @param decompressor decompressor to use
045   * @param bufferSize size of buffer
046   * @throws IOException
047   */
048  public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
049                                 int bufferSize) throws IOException {
050    super(in, decompressor, bufferSize);
051  }
052
053  /**
054   * Create a {@link BlockDecompressorStream}.
055   * 
056   * @param in input stream
057   * @param decompressor decompressor to use
058   * @throws IOException
059   */
060  public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
061    super(in, decompressor);
062  }
063
064  protected BlockDecompressorStream(InputStream in) throws IOException {
065    super(in);
066  }
067
068  @Override
069  protected int decompress(byte[] b, int off, int len) throws IOException {
070    // Check if we are the beginning of a block
071    if (noUncompressedBytes == originalBlockSize) {
072      // Get original data size
073      try {
074        originalBlockSize =  rawReadInt();
075      } catch (IOException ioe) {
076        return -1;
077      }
078      noUncompressedBytes = 0;
079      // EOF if originalBlockSize is 0
080      // This will occur only when decompressing previous compressed empty file
081      if (originalBlockSize == 0) {
082        eof = true;
083        return -1;
084      }
085    }
086
087    int n = 0;
088    while ((n = decompressor.decompress(b, off, len)) == 0) {
089      if (decompressor.finished() || decompressor.needsDictionary()) {
090        if (noUncompressedBytes >= originalBlockSize) {
091          eof = true;
092          return -1;
093        }
094      }
095      if (decompressor.needsInput()) {
096        int m;
097        try {
098          m = getCompressedData();
099        } catch (EOFException e) {
100          eof = true;
101          return -1;
102        }
103        // Send the read data to the decompressor
104        decompressor.setInput(buffer, 0, m);
105      }
106    }
107
108    // Note the no. of decompressed bytes read from 'current' block
109    noUncompressedBytes += n;
110
111    return n;
112  }
113
114  @Override
115  protected int getCompressedData() throws IOException {
116    checkStream();
117
118    // Get the size of the compressed chunk (always non-negative)
119    int len = rawReadInt();
120
121    // Read len bytes from underlying stream 
122    if (len > buffer.length) {
123      buffer = new byte[len];
124    }
125    int n = 0, off = 0;
126    while (n < len) {
127      int count = in.read(buffer, off + n, len - n);
128      if (count < 0) {
129        throw new EOFException("Unexpected end of block in input stream");
130      }
131      n += count;
132    }
133
134    return len;
135  }
136
137  @Override
138  public void resetState() throws IOException {
139    originalBlockSize = 0;
140    noUncompressedBytes = 0;
141    super.resetState();
142  }
143
144  private int rawReadInt() throws IOException {
145    int b1 = in.read();
146    int b2 = in.read();
147    int b3 = in.read();
148    int b4 = in.read();
149    if ((b1 | b2 | b3 | b4) < 0)
150      throw new EOFException();
151    return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
152  }
153}