001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024 025import com.google.common.annotations.VisibleForTesting; 026import org.apache.hadoop.classification.InterfaceAudience; 027import org.apache.hadoop.classification.InterfaceStability; 028 029@InterfaceAudience.Public 030@InterfaceStability.Evolving 031public class DecompressorStream extends CompressionInputStream { 032 /** 033 * The maximum input buffer size. 034 */ 035 private static final int MAX_INPUT_BUFFER_SIZE = 512; 036 /** 037 * MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to 038 * use when skipping. See {@link java.io.InputStream}. 039 */ 040 private static final int MAX_SKIP_BUFFER_SIZE = 2048; 041 042 private byte[] skipBytes; 043 private byte[] oneByte = new byte[1]; 044 045 protected Decompressor decompressor = null; 046 protected byte[] buffer; 047 protected boolean eof = false; 048 protected boolean closed = false; 049 private int lastBytesSent = 0; 050 051 @VisibleForTesting 052 DecompressorStream(InputStream in, Decompressor decompressor, 053 int bufferSize, int skipBufferSize) 054 throws IOException { 055 super(in); 056 057 if (decompressor == null) { 058 throw new NullPointerException(); 059 } else if (bufferSize <= 0) { 060 throw new IllegalArgumentException("Illegal bufferSize"); 061 } 062 063 this.decompressor = decompressor; 064 buffer = new byte[bufferSize]; 065 skipBytes = new byte[skipBufferSize]; 066 } 067 068 public DecompressorStream(InputStream in, Decompressor decompressor, 069 int bufferSize) 070 throws IOException { 071 this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE); 072 } 073 074 public DecompressorStream(InputStream in, Decompressor decompressor) 075 throws IOException { 076 this(in, decompressor, MAX_INPUT_BUFFER_SIZE); 077 } 078 079 /** 080 * Allow derived classes to directly set the underlying stream. 081 * 082 * @param in Underlying input stream. 083 * @throws IOException 084 */ 085 protected DecompressorStream(InputStream in) throws IOException { 086 super(in); 087 } 088 089 @Override 090 public int read() throws IOException { 091 checkStream(); 092 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff); 093 } 094 095 @Override 096 public int read(byte[] b, int off, int len) throws IOException { 097 checkStream(); 098 099 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 100 throw new IndexOutOfBoundsException(); 101 } else if (len == 0) { 102 return 0; 103 } 104 105 return decompress(b, off, len); 106 } 107 108 protected int decompress(byte[] b, int off, int len) throws IOException { 109 int n; 110 111 while ((n = decompressor.decompress(b, off, len)) == 0) { 112 if (decompressor.needsDictionary()) { 113 eof = true; 114 return -1; 115 } 116 117 if (decompressor.finished()) { 118 // First see if there was any leftover buffered input from previous 119 // stream; if not, attempt to refill buffer. If refill -> EOF, we're 120 // all done; else reset, fix up input buffer, and get ready for next 121 // concatenated substream/"member". 122 int nRemaining = decompressor.getRemaining(); 123 if (nRemaining == 0) { 124 int m = getCompressedData(); 125 if (m == -1) { 126 // apparently the previous end-of-stream was also end-of-file: 127 // return success, as if we had never called getCompressedData() 128 eof = true; 129 return -1; 130 } 131 decompressor.reset(); 132 decompressor.setInput(buffer, 0, m); 133 lastBytesSent = m; 134 } else { 135 // looks like it's a concatenated stream: reset low-level zlib (or 136 // other engine) and buffers, then "resend" remaining input data 137 decompressor.reset(); 138 int leftoverOffset = lastBytesSent - nRemaining; 139 assert (leftoverOffset >= 0); 140 // this recopies userBuf -> direct buffer if using native libraries: 141 decompressor.setInput(buffer, leftoverOffset, nRemaining); 142 // NOTE: this is the one place we do NOT want to save the number 143 // of bytes sent (nRemaining here) into lastBytesSent: since we 144 // are resending what we've already sent before, offset is nonzero 145 // in general (only way it could be zero is if it already equals 146 // nRemaining), which would then screw up the offset calculation 147 // _next_ time around. IOW, getRemaining() is in terms of the 148 // original, zero-offset bufferload, so lastBytesSent must be as 149 // well. Cheesy ASCII art: 150 // 151 // <------------ m, lastBytesSent -----------> 152 // +===============================================+ 153 // buffer: |1111111111|22222222222222222|333333333333| | 154 // +===============================================+ 155 // #1: <-- off -->|<-------- nRemaining ---------> 156 // #2: <----------- off ----------->|<-- nRem. --> 157 // #3: (final substream: nRemaining == 0; eof = true) 158 // 159 // If lastBytesSent is anything other than m, as shown, then "off" 160 // will be calculated incorrectly. 161 } 162 } else if (decompressor.needsInput()) { 163 int m = getCompressedData(); 164 if (m == -1) { 165 throw new EOFException("Unexpected end of input stream"); 166 } 167 decompressor.setInput(buffer, 0, m); 168 lastBytesSent = m; 169 } 170 } 171 172 return n; 173 } 174 175 protected int getCompressedData() throws IOException { 176 checkStream(); 177 178 // note that the _caller_ is now required to call setInput() or throw 179 return in.read(buffer, 0, buffer.length); 180 } 181 182 protected void checkStream() throws IOException { 183 if (closed) { 184 throw new IOException("Stream closed"); 185 } 186 } 187 188 @Override 189 public void resetState() throws IOException { 190 decompressor.reset(); 191 } 192 193 @Override 194 public long skip(long n) throws IOException { 195 // Sanity checks 196 if (n < 0) { 197 throw new IllegalArgumentException("negative skip length"); 198 } 199 checkStream(); 200 201 // Read 'n' bytes 202 int skipped = 0; 203 while (skipped < n) { 204 int len = Math.min(((int)n - skipped), skipBytes.length); 205 len = read(skipBytes, 0, len); 206 if (len == -1) { 207 eof = true; 208 break; 209 } 210 skipped += len; 211 } 212 return skipped; 213 } 214 215 @Override 216 public int available() throws IOException { 217 checkStream(); 218 return (eof) ? 0 : 1; 219 } 220 221 @Override 222 public void close() throws IOException { 223 if (!closed) { 224 in.close(); 225 closed = true; 226 } 227 } 228 229 @Override 230 public boolean markSupported() { 231 return false; 232 } 233 234 @Override 235 public synchronized void mark(int readlimit) { 236 } 237 238 @Override 239 public synchronized void reset() throws IOException { 240 throw new IOException("mark/reset not supported"); 241 } 242 243}