001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.io.compress.Decompressor;
028
029@InterfaceAudience.Public
030@InterfaceStability.Evolving
031public class DecompressorStream extends CompressionInputStream {
032  protected Decompressor decompressor = null;
033  protected byte[] buffer;
034  protected boolean eof = false;
035  protected boolean closed = false;
036  private int lastBytesSent = 0;
037
038  public DecompressorStream(InputStream in, Decompressor decompressor,
039                            int bufferSize)
040  throws IOException {
041    super(in);
042
043    if (in == null || decompressor == null) {
044      throw new NullPointerException();
045    } else if (bufferSize <= 0) {
046      throw new IllegalArgumentException("Illegal bufferSize");
047    }
048
049    this.decompressor = decompressor;
050    buffer = new byte[bufferSize];
051  }
052
053  public DecompressorStream(InputStream in, Decompressor decompressor)
054  throws IOException {
055    this(in, decompressor, 512);
056  }
057
058  /**
059   * Allow derived classes to directly set the underlying stream.
060   * 
061   * @param in Underlying input stream.
062   * @throws IOException
063   */
064  protected DecompressorStream(InputStream in) throws IOException {
065    super(in);
066  }
067  
068  private byte[] oneByte = new byte[1];
069  public int read() throws IOException {
070    checkStream();
071    return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
072  }
073
074  public int read(byte[] b, int off, int len) throws IOException {
075    checkStream();
076    
077    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
078      throw new IndexOutOfBoundsException();
079    } else if (len == 0) {
080      return 0;
081    }
082
083    return decompress(b, off, len);
084  }
085
086  protected int decompress(byte[] b, int off, int len) throws IOException {
087    int n = 0;
088
089    while ((n = decompressor.decompress(b, off, len)) == 0) {
090      if (decompressor.needsDictionary()) {
091        eof = true;
092        return -1;
093      }
094
095      if (decompressor.finished()) {
096        // First see if there was any leftover buffered input from previous
097        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
098        // all done; else reset, fix up input buffer, and get ready for next
099        // concatenated substream/"member".
100        int nRemaining = decompressor.getRemaining();
101        if (nRemaining == 0) {
102          int m = getCompressedData();
103          if (m == -1) {
104            // apparently the previous end-of-stream was also end-of-file:
105            // return success, as if we had never called getCompressedData()
106            eof = true;
107            return -1;
108          }
109          decompressor.reset();
110          decompressor.setInput(buffer, 0, m);
111          lastBytesSent = m;
112        } else {
113          // looks like it's a concatenated stream:  reset low-level zlib (or
114          // other engine) and buffers, then "resend" remaining input data
115          decompressor.reset();
116          int leftoverOffset = lastBytesSent - nRemaining;
117          assert (leftoverOffset >= 0);
118          // this recopies userBuf -> direct buffer if using native libraries:
119          decompressor.setInput(buffer, leftoverOffset, nRemaining);
120          // NOTE:  this is the one place we do NOT want to save the number
121          // of bytes sent (nRemaining here) into lastBytesSent:  since we
122          // are resending what we've already sent before, offset is nonzero
123          // in general (only way it could be zero is if it already equals
124          // nRemaining), which would then screw up the offset calculation
125          // _next_ time around.  IOW, getRemaining() is in terms of the
126          // original, zero-offset bufferload, so lastBytesSent must be as
127          // well.  Cheesy ASCII art:
128          //
129          //          <------------ m, lastBytesSent ----------->
130          //          +===============================================+
131          // buffer:  |1111111111|22222222222222222|333333333333|     |
132          //          +===============================================+
133          //     #1:  <-- off -->|<-------- nRemaining --------->
134          //     #2:  <----------- off ----------->|<-- nRem. -->
135          //     #3:  (final substream:  nRemaining == 0; eof = true)
136          //
137          // If lastBytesSent is anything other than m, as shown, then "off"
138          // will be calculated incorrectly.
139        }
140      } else if (decompressor.needsInput()) {
141        int m = getCompressedData();
142        if (m == -1) {
143          throw new EOFException("Unexpected end of input stream");
144        }
145        decompressor.setInput(buffer, 0, m);
146        lastBytesSent = m;
147      }
148    }
149
150    return n;
151  }
152
153  protected int getCompressedData() throws IOException {
154    checkStream();
155  
156    // note that the _caller_ is now required to call setInput() or throw
157    return in.read(buffer, 0, buffer.length);
158  }
159
160  protected void checkStream() throws IOException {
161    if (closed) {
162      throw new IOException("Stream closed");
163    }
164  }
165  
166  public void resetState() throws IOException {
167    decompressor.reset();
168  }
169
170  private byte[] skipBytes = new byte[512];
171  public long skip(long n) throws IOException {
172    // Sanity checks
173    if (n < 0) {
174      throw new IllegalArgumentException("negative skip length");
175    }
176    checkStream();
177    
178    // Read 'n' bytes
179    int skipped = 0;
180    while (skipped < n) {
181      int len = Math.min(((int)n - skipped), skipBytes.length);
182      len = read(skipBytes, 0, len);
183      if (len == -1) {
184        eof = true;
185        break;
186      }
187      skipped += len;
188    }
189    return skipped;
190  }
191
192  public int available() throws IOException {
193    checkStream();
194    return (eof) ? 0 : 1;
195  }
196
197  public void close() throws IOException {
198    if (!closed) {
199      in.close();
200      closed = true;
201    }
202  }
203
204  public boolean markSupported() {
205    return false;
206  }
207
208  public synchronized void mark(int readlimit) {
209  }
210
211  public synchronized void reset() throws IOException {
212    throw new IOException("mark/reset not supported");
213  }
214
215}