001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024
025import com.google.common.annotations.VisibleForTesting;
026import org.apache.hadoop.classification.InterfaceAudience;
027import org.apache.hadoop.classification.InterfaceStability;
028
029@InterfaceAudience.Public
030@InterfaceStability.Evolving
031public class DecompressorStream extends CompressionInputStream {
032  /**
033   * The maximum input buffer size.
034   */
035  private static final int MAX_INPUT_BUFFER_SIZE = 512;
036  /**
037   * MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to
038   * use when skipping. See {@link java.io.InputStream}.
039   */
040  private static final int MAX_SKIP_BUFFER_SIZE = 2048;
041
042  private byte[] skipBytes;
043  private byte[] oneByte = new byte[1];
044
045  protected Decompressor decompressor = null;
046  protected byte[] buffer;
047  protected boolean eof = false;
048  protected boolean closed = false;
049  private int lastBytesSent = 0;
050
051  @VisibleForTesting
052  DecompressorStream(InputStream in, Decompressor decompressor,
053                            int bufferSize, int skipBufferSize)
054      throws IOException {
055    super(in);
056
057    if (decompressor == null) {
058      throw new NullPointerException();
059    } else if (bufferSize <= 0) {
060      throw new IllegalArgumentException("Illegal bufferSize");
061    }
062
063    this.decompressor = decompressor;
064    buffer = new byte[bufferSize];
065    skipBytes = new byte[skipBufferSize];
066  }
067
068  public DecompressorStream(InputStream in, Decompressor decompressor,
069                            int bufferSize)
070      throws IOException {
071    this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE);
072  }
073
074  public DecompressorStream(InputStream in, Decompressor decompressor)
075      throws IOException {
076    this(in, decompressor, MAX_INPUT_BUFFER_SIZE);
077  }
078
079  /**
080   * Allow derived classes to directly set the underlying stream.
081   * 
082   * @param in Underlying input stream.
083   * @throws IOException
084   */
085  protected DecompressorStream(InputStream in) throws IOException {
086    super(in);
087  }
088
089  @Override
090  public int read() throws IOException {
091    checkStream();
092    return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
093  }
094
095  @Override
096  public int read(byte[] b, int off, int len) throws IOException {
097    checkStream();
098    
099    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
100      throw new IndexOutOfBoundsException();
101    } else if (len == 0) {
102      return 0;
103    }
104
105    return decompress(b, off, len);
106  }
107
108  protected int decompress(byte[] b, int off, int len) throws IOException {
109    int n;
110
111    while ((n = decompressor.decompress(b, off, len)) == 0) {
112      if (decompressor.needsDictionary()) {
113        eof = true;
114        return -1;
115      }
116
117      if (decompressor.finished()) {
118        // First see if there was any leftover buffered input from previous
119        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
120        // all done; else reset, fix up input buffer, and get ready for next
121        // concatenated substream/"member".
122        int nRemaining = decompressor.getRemaining();
123        if (nRemaining == 0) {
124          int m = getCompressedData();
125          if (m == -1) {
126            // apparently the previous end-of-stream was also end-of-file:
127            // return success, as if we had never called getCompressedData()
128            eof = true;
129            return -1;
130          }
131          decompressor.reset();
132          decompressor.setInput(buffer, 0, m);
133          lastBytesSent = m;
134        } else {
135          // looks like it's a concatenated stream:  reset low-level zlib (or
136          // other engine) and buffers, then "resend" remaining input data
137          decompressor.reset();
138          int leftoverOffset = lastBytesSent - nRemaining;
139          assert (leftoverOffset >= 0);
140          // this recopies userBuf -> direct buffer if using native libraries:
141          decompressor.setInput(buffer, leftoverOffset, nRemaining);
142          // NOTE:  this is the one place we do NOT want to save the number
143          // of bytes sent (nRemaining here) into lastBytesSent:  since we
144          // are resending what we've already sent before, offset is nonzero
145          // in general (only way it could be zero is if it already equals
146          // nRemaining), which would then screw up the offset calculation
147          // _next_ time around.  IOW, getRemaining() is in terms of the
148          // original, zero-offset bufferload, so lastBytesSent must be as
149          // well.  Cheesy ASCII art:
150          //
151          //          <------------ m, lastBytesSent ----------->
152          //          +===============================================+
153          // buffer:  |1111111111|22222222222222222|333333333333|     |
154          //          +===============================================+
155          //     #1:  <-- off -->|<-------- nRemaining --------->
156          //     #2:  <----------- off ----------->|<-- nRem. -->
157          //     #3:  (final substream:  nRemaining == 0; eof = true)
158          //
159          // If lastBytesSent is anything other than m, as shown, then "off"
160          // will be calculated incorrectly.
161        }
162      } else if (decompressor.needsInput()) {
163        int m = getCompressedData();
164        if (m == -1) {
165          throw new EOFException("Unexpected end of input stream");
166        }
167        decompressor.setInput(buffer, 0, m);
168        lastBytesSent = m;
169      }
170    }
171
172    return n;
173  }
174
175  protected int getCompressedData() throws IOException {
176    checkStream();
177  
178    // note that the _caller_ is now required to call setInput() or throw
179    return in.read(buffer, 0, buffer.length);
180  }
181
182  protected void checkStream() throws IOException {
183    if (closed) {
184      throw new IOException("Stream closed");
185    }
186  }
187  
188  @Override
189  public void resetState() throws IOException {
190    decompressor.reset();
191  }
192
193  @Override
194  public long skip(long n) throws IOException {
195    // Sanity checks
196    if (n < 0) {
197      throw new IllegalArgumentException("negative skip length");
198    }
199    checkStream();
200
201    // Read 'n' bytes
202    int skipped = 0;
203    while (skipped < n) {
204      int len = Math.min(((int)n - skipped), skipBytes.length);
205      len = read(skipBytes, 0, len);
206      if (len == -1) {
207        eof = true;
208        break;
209      }
210      skipped += len;
211    }
212    return skipped;
213  }
214
215  @Override
216  public int available() throws IOException {
217    checkStream();
218    return (eof) ? 0 : 1;
219  }
220
221  @Override
222  public void close() throws IOException {
223    if (!closed) {
224      in.close();
225      closed = true;
226    }
227  }
228
229  @Override
230  public boolean markSupported() {
231    return false;
232  }
233
234  @Override
235  public synchronized void mark(int readlimit) {
236  }
237
238  @Override
239  public synchronized void reset() throws IOException {
240    throw new IOException("mark/reset not supported");
241  }
242
243}