001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.io.compress.Decompressor;
028
029@InterfaceAudience.Public
030@InterfaceStability.Evolving
031public class DecompressorStream extends CompressionInputStream {
032  protected Decompressor decompressor = null;
033  protected byte[] buffer;
034  protected boolean eof = false;
035  protected boolean closed = false;
036  private int lastBytesSent = 0;
037
038  public DecompressorStream(InputStream in, Decompressor decompressor,
039                            int bufferSize)
040  throws IOException {
041    super(in);
042
043    if (decompressor == null) {
044      throw new NullPointerException();
045    } else if (bufferSize <= 0) {
046      throw new IllegalArgumentException("Illegal bufferSize");
047    }
048
049    this.decompressor = decompressor;
050    buffer = new byte[bufferSize];
051  }
052
053  public DecompressorStream(InputStream in, Decompressor decompressor)
054  throws IOException {
055    this(in, decompressor, 512);
056  }
057
058  /**
059   * Allow derived classes to directly set the underlying stream.
060   * 
061   * @param in Underlying input stream.
062   * @throws IOException
063   */
064  protected DecompressorStream(InputStream in) throws IOException {
065    super(in);
066  }
067  
068  private byte[] oneByte = new byte[1];
069  @Override
070  public int read() throws IOException {
071    checkStream();
072    return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
073  }
074
075  @Override
076  public int read(byte[] b, int off, int len) throws IOException {
077    checkStream();
078    
079    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
080      throw new IndexOutOfBoundsException();
081    } else if (len == 0) {
082      return 0;
083    }
084
085    return decompress(b, off, len);
086  }
087
088  protected int decompress(byte[] b, int off, int len) throws IOException {
089    int n = 0;
090
091    while ((n = decompressor.decompress(b, off, len)) == 0) {
092      if (decompressor.needsDictionary()) {
093        eof = true;
094        return -1;
095      }
096
097      if (decompressor.finished()) {
098        // First see if there was any leftover buffered input from previous
099        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
100        // all done; else reset, fix up input buffer, and get ready for next
101        // concatenated substream/"member".
102        int nRemaining = decompressor.getRemaining();
103        if (nRemaining == 0) {
104          int m = getCompressedData();
105          if (m == -1) {
106            // apparently the previous end-of-stream was also end-of-file:
107            // return success, as if we had never called getCompressedData()
108            eof = true;
109            return -1;
110          }
111          decompressor.reset();
112          decompressor.setInput(buffer, 0, m);
113          lastBytesSent = m;
114        } else {
115          // looks like it's a concatenated stream:  reset low-level zlib (or
116          // other engine) and buffers, then "resend" remaining input data
117          decompressor.reset();
118          int leftoverOffset = lastBytesSent - nRemaining;
119          assert (leftoverOffset >= 0);
120          // this recopies userBuf -> direct buffer if using native libraries:
121          decompressor.setInput(buffer, leftoverOffset, nRemaining);
122          // NOTE:  this is the one place we do NOT want to save the number
123          // of bytes sent (nRemaining here) into lastBytesSent:  since we
124          // are resending what we've already sent before, offset is nonzero
125          // in general (only way it could be zero is if it already equals
126          // nRemaining), which would then screw up the offset calculation
127          // _next_ time around.  IOW, getRemaining() is in terms of the
128          // original, zero-offset bufferload, so lastBytesSent must be as
129          // well.  Cheesy ASCII art:
130          //
131          //          <------------ m, lastBytesSent ----------->
132          //          +===============================================+
133          // buffer:  |1111111111|22222222222222222|333333333333|     |
134          //          +===============================================+
135          //     #1:  <-- off -->|<-------- nRemaining --------->
136          //     #2:  <----------- off ----------->|<-- nRem. -->
137          //     #3:  (final substream:  nRemaining == 0; eof = true)
138          //
139          // If lastBytesSent is anything other than m, as shown, then "off"
140          // will be calculated incorrectly.
141        }
142      } else if (decompressor.needsInput()) {
143        int m = getCompressedData();
144        if (m == -1) {
145          throw new EOFException("Unexpected end of input stream");
146        }
147        decompressor.setInput(buffer, 0, m);
148        lastBytesSent = m;
149      }
150    }
151
152    return n;
153  }
154
155  protected int getCompressedData() throws IOException {
156    checkStream();
157  
158    // note that the _caller_ is now required to call setInput() or throw
159    return in.read(buffer, 0, buffer.length);
160  }
161
162  protected void checkStream() throws IOException {
163    if (closed) {
164      throw new IOException("Stream closed");
165    }
166  }
167  
168  @Override
169  public void resetState() throws IOException {
170    decompressor.reset();
171  }
172
173  private byte[] skipBytes = new byte[512];
174  @Override
175  public long skip(long n) throws IOException {
176    // Sanity checks
177    if (n < 0) {
178      throw new IllegalArgumentException("negative skip length");
179    }
180    checkStream();
181    
182    // Read 'n' bytes
183    int skipped = 0;
184    while (skipped < n) {
185      int len = Math.min(((int)n - skipped), skipBytes.length);
186      len = read(skipBytes, 0, len);
187      if (len == -1) {
188        eof = true;
189        break;
190      }
191      skipped += len;
192    }
193    return skipped;
194  }
195
196  @Override
197  public int available() throws IOException {
198    checkStream();
199    return (eof) ? 0 : 1;
200  }
201
202  @Override
203  public void close() throws IOException {
204    if (!closed) {
205      try {
206        super.close();
207      } finally {
208        closed = true;
209      }
210    }
211  }
212
213  @Override
214  public boolean markSupported() {
215    return false;
216  }
217
218  @Override
219  public synchronized void mark(int readlimit) {
220  }
221
222  @Override
223  public synchronized void reset() throws IOException {
224    throw new IOException("mark/reset not supported");
225  }
226
227}