001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027
028/**
029 * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
030 * with 'block-based' based compression algorithms, as opposed to 
031 * 'stream-based' compression algorithms.
032 *  
033 */
034@InterfaceAudience.Public
035@InterfaceStability.Evolving
036public class BlockDecompressorStream extends DecompressorStream {
037  private int originalBlockSize = 0;
038  private int noUncompressedBytes = 0;
039
040  /**
041   * Create a {@link BlockDecompressorStream}.
042   * 
043   * @param in input stream
044   * @param decompressor decompressor to use
045   * @param bufferSize size of buffer
046   * @throws IOException
047   */
048  public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
049                                 int bufferSize) throws IOException {
050    super(in, decompressor, bufferSize);
051  }
052
053  /**
054   * Create a {@link BlockDecompressorStream}.
055   * 
056   * @param in input stream
057   * @param decompressor decompressor to use
058   * @throws IOException
059   */
060  public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
061    super(in, decompressor);
062  }
063
064  protected BlockDecompressorStream(InputStream in) throws IOException {
065    super(in);
066  }
067
068  protected int decompress(byte[] b, int off, int len) throws IOException {
069    // Check if we are the beginning of a block
070    if (noUncompressedBytes == originalBlockSize) {
071      // Get original data size
072      try {
073        originalBlockSize =  rawReadInt();
074      } catch (IOException ioe) {
075        return -1;
076      }
077      noUncompressedBytes = 0;
078      // EOF if originalBlockSize is 0
079      // This will occur only when decompressing previous compressed empty file
080      if (originalBlockSize == 0) {
081        eof = true;
082        return -1;
083      }
084    }
085
086    int n = 0;
087    while ((n = decompressor.decompress(b, off, len)) == 0) {
088      if (decompressor.finished() || decompressor.needsDictionary()) {
089        if (noUncompressedBytes >= originalBlockSize) {
090          eof = true;
091          return -1;
092        }
093      }
094      if (decompressor.needsInput()) {
095        int m = getCompressedData();
096        // Send the read data to the decompressor
097        decompressor.setInput(buffer, 0, m);
098      }
099    }
100
101    // Note the no. of decompressed bytes read from 'current' block
102    noUncompressedBytes += n;
103
104    return n;
105  }
106
107  protected int getCompressedData() throws IOException {
108    checkStream();
109
110    // Get the size of the compressed chunk (always non-negative)
111    int len = rawReadInt();
112
113    // Read len bytes from underlying stream 
114    if (len > buffer.length) {
115      buffer = new byte[len];
116    }
117    int n = 0, off = 0;
118    while (n < len) {
119      int count = in.read(buffer, off + n, len - n);
120      if (count < 0) {
121        throw new EOFException("Unexpected end of block in input stream");
122      }
123      n += count;
124    }
125
126    return len;
127  }
128
129  public void resetState() throws IOException {
130    originalBlockSize = 0;
131    noUncompressedBytes = 0;
132    super.resetState();
133  }
134
135  private int rawReadInt() throws IOException {
136    int b1 = in.read();
137    int b2 = in.read();
138    int b3 = in.read();
139    int b4 = in.read();
140    if ((b1 | b2 | b3 | b4) < 0)
141      throw new EOFException();
142    return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
143  }
144}