001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.io.compress.Decompressor; 028 029@InterfaceAudience.Public 030@InterfaceStability.Evolving 031public class DecompressorStream extends CompressionInputStream { 032 protected Decompressor decompressor = null; 033 protected byte[] buffer; 034 protected boolean eof = false; 035 protected boolean closed = false; 036 private int lastBytesSent = 0; 037 038 public DecompressorStream(InputStream in, Decompressor decompressor, 039 int bufferSize) 040 throws IOException { 041 super(in); 042 043 if (in == null || decompressor == null) { 044 throw new NullPointerException(); 045 } else if (bufferSize <= 0) { 046 throw new IllegalArgumentException("Illegal bufferSize"); 047 } 048 049 this.decompressor = decompressor; 050 buffer = new byte[bufferSize]; 051 } 052 053 public DecompressorStream(InputStream in, Decompressor decompressor) 054 throws IOException { 055 this(in, decompressor, 512); 056 } 057 058 /** 059 * Allow derived classes to directly set the underlying stream. 060 * 061 * @param in Underlying input stream. 062 * @throws IOException 063 */ 064 protected DecompressorStream(InputStream in) throws IOException { 065 super(in); 066 } 067 068 private byte[] oneByte = new byte[1]; 069 public int read() throws IOException { 070 checkStream(); 071 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff); 072 } 073 074 public int read(byte[] b, int off, int len) throws IOException { 075 checkStream(); 076 077 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 078 throw new IndexOutOfBoundsException(); 079 } else if (len == 0) { 080 return 0; 081 } 082 083 return decompress(b, off, len); 084 } 085 086 protected int decompress(byte[] b, int off, int len) throws IOException { 087 int n = 0; 088 089 while ((n = decompressor.decompress(b, off, len)) == 0) { 090 if (decompressor.needsDictionary()) { 091 eof = true; 092 return -1; 093 } 094 095 if (decompressor.finished()) { 096 // First see if there was any leftover buffered input from previous 097 // stream; if not, attempt to refill buffer. If refill -> EOF, we're 098 // all done; else reset, fix up input buffer, and get ready for next 099 // concatenated substream/"member". 100 int nRemaining = decompressor.getRemaining(); 101 if (nRemaining == 0) { 102 int m = getCompressedData(); 103 if (m == -1) { 104 // apparently the previous end-of-stream was also end-of-file: 105 // return success, as if we had never called getCompressedData() 106 eof = true; 107 return -1; 108 } 109 decompressor.reset(); 110 decompressor.setInput(buffer, 0, m); 111 lastBytesSent = m; 112 } else { 113 // looks like it's a concatenated stream: reset low-level zlib (or 114 // other engine) and buffers, then "resend" remaining input data 115 decompressor.reset(); 116 int leftoverOffset = lastBytesSent - nRemaining; 117 assert (leftoverOffset >= 0); 118 // this recopies userBuf -> direct buffer if using native libraries: 119 decompressor.setInput(buffer, leftoverOffset, nRemaining); 120 // NOTE: this is the one place we do NOT want to save the number 121 // of bytes sent (nRemaining here) into lastBytesSent: since we 122 // are resending what we've already sent before, offset is nonzero 123 // in general (only way it could be zero is if it already equals 124 // nRemaining), which would then screw up the offset calculation 125 // _next_ time around. IOW, getRemaining() is in terms of the 126 // original, zero-offset bufferload, so lastBytesSent must be as 127 // well. Cheesy ASCII art: 128 // 129 // <------------ m, lastBytesSent -----------> 130 // +===============================================+ 131 // buffer: |1111111111|22222222222222222|333333333333| | 132 // +===============================================+ 133 // #1: <-- off -->|<-------- nRemaining ---------> 134 // #2: <----------- off ----------->|<-- nRem. --> 135 // #3: (final substream: nRemaining == 0; eof = true) 136 // 137 // If lastBytesSent is anything other than m, as shown, then "off" 138 // will be calculated incorrectly. 139 } 140 } else if (decompressor.needsInput()) { 141 int m = getCompressedData(); 142 if (m == -1) { 143 throw new EOFException("Unexpected end of input stream"); 144 } 145 decompressor.setInput(buffer, 0, m); 146 lastBytesSent = m; 147 } 148 } 149 150 return n; 151 } 152 153 protected int getCompressedData() throws IOException { 154 checkStream(); 155 156 // note that the _caller_ is now required to call setInput() or throw 157 return in.read(buffer, 0, buffer.length); 158 } 159 160 protected void checkStream() throws IOException { 161 if (closed) { 162 throw new IOException("Stream closed"); 163 } 164 } 165 166 public void resetState() throws IOException { 167 decompressor.reset(); 168 } 169 170 private byte[] skipBytes = new byte[512]; 171 public long skip(long n) throws IOException { 172 // Sanity checks 173 if (n < 0) { 174 throw new IllegalArgumentException("negative skip length"); 175 } 176 checkStream(); 177 178 // Read 'n' bytes 179 int skipped = 0; 180 while (skipped < n) { 181 int len = Math.min(((int)n - skipped), skipBytes.length); 182 len = read(skipBytes, 0, len); 183 if (len == -1) { 184 eof = true; 185 break; 186 } 187 skipped += len; 188 } 189 return skipped; 190 } 191 192 public int available() throws IOException { 193 checkStream(); 194 return (eof) ? 0 : 1; 195 } 196 197 public void close() throws IOException { 198 if (!closed) { 199 in.close(); 200 closed = true; 201 } 202 } 203 204 public boolean markSupported() { 205 return false; 206 } 207 208 public synchronized void mark(int readlimit) { 209 } 210 211 public synchronized void reset() throws IOException { 212 throw new IOException("mark/reset not supported"); 213 } 214 215}