001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.EOFException;
022    import java.io.IOException;
023    import java.io.InputStream;
024    
025    import org.apache.hadoop.classification.InterfaceAudience;
026    import org.apache.hadoop.classification.InterfaceStability;
027    import org.apache.hadoop.io.compress.Decompressor;
028    
029    @InterfaceAudience.Public
030    @InterfaceStability.Evolving
031    public class DecompressorStream extends CompressionInputStream {
032      protected Decompressor decompressor = null;
033      protected byte[] buffer;
034      protected boolean eof = false;
035      protected boolean closed = false;
036      private int lastBytesSent = 0;
037    
038      public DecompressorStream(InputStream in, Decompressor decompressor,
039                                int bufferSize)
040      throws IOException {
041        super(in);
042    
043        if (in == null || decompressor == null) {
044          throw new NullPointerException();
045        } else if (bufferSize <= 0) {
046          throw new IllegalArgumentException("Illegal bufferSize");
047        }
048    
049        this.decompressor = decompressor;
050        buffer = new byte[bufferSize];
051      }
052    
053      public DecompressorStream(InputStream in, Decompressor decompressor)
054      throws IOException {
055        this(in, decompressor, 512);
056      }
057    
058      /**
059       * Allow derived classes to directly set the underlying stream.
060       * 
061       * @param in Underlying input stream.
062       * @throws IOException
063       */
064      protected DecompressorStream(InputStream in) throws IOException {
065        super(in);
066      }
067      
068      private byte[] oneByte = new byte[1];
069      @Override
070      public int read() throws IOException {
071        checkStream();
072        return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
073      }
074    
075      @Override
076      public int read(byte[] b, int off, int len) throws IOException {
077        checkStream();
078        
079        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
080          throw new IndexOutOfBoundsException();
081        } else if (len == 0) {
082          return 0;
083        }
084    
085        return decompress(b, off, len);
086      }
087    
088      protected int decompress(byte[] b, int off, int len) throws IOException {
089        int n = 0;
090    
091        while ((n = decompressor.decompress(b, off, len)) == 0) {
092          if (decompressor.needsDictionary()) {
093            eof = true;
094            return -1;
095          }
096    
097          if (decompressor.finished()) {
098            // First see if there was any leftover buffered input from previous
099            // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
100            // all done; else reset, fix up input buffer, and get ready for next
101            // concatenated substream/"member".
102            int nRemaining = decompressor.getRemaining();
103            if (nRemaining == 0) {
104              int m = getCompressedData();
105              if (m == -1) {
106                // apparently the previous end-of-stream was also end-of-file:
107                // return success, as if we had never called getCompressedData()
108                eof = true;
109                return -1;
110              }
111              decompressor.reset();
112              decompressor.setInput(buffer, 0, m);
113              lastBytesSent = m;
114            } else {
115              // looks like it's a concatenated stream:  reset low-level zlib (or
116              // other engine) and buffers, then "resend" remaining input data
117              decompressor.reset();
118              int leftoverOffset = lastBytesSent - nRemaining;
119              assert (leftoverOffset >= 0);
120              // this recopies userBuf -> direct buffer if using native libraries:
121              decompressor.setInput(buffer, leftoverOffset, nRemaining);
122              // NOTE:  this is the one place we do NOT want to save the number
123              // of bytes sent (nRemaining here) into lastBytesSent:  since we
124              // are resending what we've already sent before, offset is nonzero
125              // in general (only way it could be zero is if it already equals
126              // nRemaining), which would then screw up the offset calculation
127              // _next_ time around.  IOW, getRemaining() is in terms of the
128              // original, zero-offset bufferload, so lastBytesSent must be as
129              // well.  Cheesy ASCII art:
130              //
131              //          <------------ m, lastBytesSent ----------->
132              //          +===============================================+
133              // buffer:  |1111111111|22222222222222222|333333333333|     |
134              //          +===============================================+
135              //     #1:  <-- off -->|<-------- nRemaining --------->
136              //     #2:  <----------- off ----------->|<-- nRem. -->
137              //     #3:  (final substream:  nRemaining == 0; eof = true)
138              //
139              // If lastBytesSent is anything other than m, as shown, then "off"
140              // will be calculated incorrectly.
141            }
142          } else if (decompressor.needsInput()) {
143            int m = getCompressedData();
144            if (m == -1) {
145              throw new EOFException("Unexpected end of input stream");
146            }
147            decompressor.setInput(buffer, 0, m);
148            lastBytesSent = m;
149          }
150        }
151    
152        return n;
153      }
154    
155      protected int getCompressedData() throws IOException {
156        checkStream();
157      
158        // note that the _caller_ is now required to call setInput() or throw
159        return in.read(buffer, 0, buffer.length);
160      }
161    
162      protected void checkStream() throws IOException {
163        if (closed) {
164          throw new IOException("Stream closed");
165        }
166      }
167      
168      @Override
169      public void resetState() throws IOException {
170        decompressor.reset();
171      }
172    
173      private byte[] skipBytes = new byte[512];
174      @Override
175      public long skip(long n) throws IOException {
176        // Sanity checks
177        if (n < 0) {
178          throw new IllegalArgumentException("negative skip length");
179        }
180        checkStream();
181        
182        // Read 'n' bytes
183        int skipped = 0;
184        while (skipped < n) {
185          int len = Math.min(((int)n - skipped), skipBytes.length);
186          len = read(skipBytes, 0, len);
187          if (len == -1) {
188            eof = true;
189            break;
190          }
191          skipped += len;
192        }
193        return skipped;
194      }
195    
196      @Override
197      public int available() throws IOException {
198        checkStream();
199        return (eof) ? 0 : 1;
200      }
201    
202      @Override
203      public void close() throws IOException {
204        if (!closed) {
205          in.close();
206          closed = true;
207        }
208      }
209    
210      @Override
211      public boolean markSupported() {
212        return false;
213      }
214    
215      @Override
216      public synchronized void mark(int readlimit) {
217      }
218    
219      @Override
220      public synchronized void reset() throws IOException {
221        throw new IOException("mark/reset not supported");
222      }
223    
224    }