001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.io.InputStream;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    
028    /**
029     * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works
030     * with 'block-based' based compression algorithms, as opposed to 
031     * 'stream-based' compression algorithms.
032     *  
033     */
034    @InterfaceAudience.Public
035    @InterfaceStability.Evolving
036    public class BlockDecompressorStream extends DecompressorStream {
037      private int originalBlockSize = 0;
038      private int noUncompressedBytes = 0;
039    
040      /**
041       * Create a {@link BlockDecompressorStream}.
042       * 
043       * @param in input stream
044       * @param decompressor decompressor to use
045       * @param bufferSize size of buffer
046       * @throws IOException
047       */
048      public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
049                                     int bufferSize) throws IOException {
050        super(in, decompressor, bufferSize);
051      }
052    
053      /**
054       * Create a {@link BlockDecompressorStream}.
055       * 
056       * @param in input stream
057       * @param decompressor decompressor to use
058       * @throws IOException
059       */
060      public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
061        super(in, decompressor);
062      }
063    
064      protected BlockDecompressorStream(InputStream in) throws IOException {
065        super(in);
066      }
067    
068      @Override
069      protected int decompress(byte[] b, int off, int len) throws IOException {
070        // Check if we are the beginning of a block
071        if (noUncompressedBytes == originalBlockSize) {
072          // Get original data size
073          try {
074            originalBlockSize =  rawReadInt();
075          } catch (IOException ioe) {
076            return -1;
077          }
078          noUncompressedBytes = 0;
079          // EOF if originalBlockSize is 0
080          // This will occur only when decompressing previous compressed empty file
081          if (originalBlockSize == 0) {
082            eof = true;
083            return -1;
084          }
085        }
086    
087        int n = 0;
088        while ((n = decompressor.decompress(b, off, len)) == 0) {
089          if (decompressor.finished() || decompressor.needsDictionary()) {
090            if (noUncompressedBytes >= originalBlockSize) {
091              eof = true;
092              return -1;
093            }
094          }
095          if (decompressor.needsInput()) {
096            int m;
097            try {
098              m = getCompressedData();
099            } catch (EOFException e) {
100              eof = true;
101              return -1;
102            }
103            // Send the read data to the decompressor
104            decompressor.setInput(buffer, 0, m);
105          }
106        }
107    
108        // Note the no. of decompressed bytes read from 'current' block
109        noUncompressedBytes += n;
110    
111        return n;
112      }
113    
114      @Override
115      protected int getCompressedData() throws IOException {
116        checkStream();
117    
118        // Get the size of the compressed chunk (always non-negative)
119        int len = rawReadInt();
120    
121        // Read len bytes from underlying stream 
122        if (len > buffer.length) {
123          buffer = new byte[len];
124        }
125        int n = 0, off = 0;
126        while (n < len) {
127          int count = in.read(buffer, off + n, len - n);
128          if (count < 0) {
129            throw new EOFException("Unexpected end of block in input stream");
130          }
131          n += count;
132        }
133    
134        return len;
135      }
136    
137      @Override
138      public void resetState() throws IOException {
139        originalBlockSize = 0;
140        noUncompressedBytes = 0;
141        super.resetState();
142      }
143    
144      private int rawReadInt() throws IOException {
145        int b1 = in.read();
146        int b2 = in.read();
147        int b3 = in.read();
148        int b4 = in.read();
149        if ((b1 | b2 | b3 | b4) < 0)
150          throw new EOFException();
151        return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0));
152      }
153    }