001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.io.compress.Decompressor; 028 029@InterfaceAudience.Public 030@InterfaceStability.Evolving 031public class DecompressorStream extends CompressionInputStream { 032 protected Decompressor decompressor = null; 033 protected byte[] buffer; 034 protected boolean eof = false; 035 protected boolean closed = false; 036 private int lastBytesSent = 0; 037 038 public DecompressorStream(InputStream in, Decompressor decompressor, 039 int bufferSize) 040 throws IOException { 041 super(in); 042 043 if (decompressor == null) { 044 throw new NullPointerException(); 045 } else if (bufferSize <= 0) { 046 throw new IllegalArgumentException("Illegal bufferSize"); 047 } 048 049 this.decompressor = decompressor; 050 buffer = new byte[bufferSize]; 051 } 052 053 public DecompressorStream(InputStream in, Decompressor decompressor) 054 throws IOException { 055 this(in, decompressor, 512); 056 } 057 058 /** 059 * Allow derived classes to directly set the underlying stream. 060 * 061 * @param in Underlying input stream. 062 * @throws IOException 063 */ 064 protected DecompressorStream(InputStream in) throws IOException { 065 super(in); 066 } 067 068 private byte[] oneByte = new byte[1]; 069 @Override 070 public int read() throws IOException { 071 checkStream(); 072 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff); 073 } 074 075 @Override 076 public int read(byte[] b, int off, int len) throws IOException { 077 checkStream(); 078 079 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 080 throw new IndexOutOfBoundsException(); 081 } else if (len == 0) { 082 return 0; 083 } 084 085 return decompress(b, off, len); 086 } 087 088 protected int decompress(byte[] b, int off, int len) throws IOException { 089 int n = 0; 090 091 while ((n = decompressor.decompress(b, off, len)) == 0) { 092 if (decompressor.needsDictionary()) { 093 eof = true; 094 return -1; 095 } 096 097 if (decompressor.finished()) { 098 // First see if there was any leftover buffered input from previous 099 // stream; if not, attempt to refill buffer. If refill -> EOF, we're 100 // all done; else reset, fix up input buffer, and get ready for next 101 // concatenated substream/"member". 102 int nRemaining = decompressor.getRemaining(); 103 if (nRemaining == 0) { 104 int m = getCompressedData(); 105 if (m == -1) { 106 // apparently the previous end-of-stream was also end-of-file: 107 // return success, as if we had never called getCompressedData() 108 eof = true; 109 return -1; 110 } 111 decompressor.reset(); 112 decompressor.setInput(buffer, 0, m); 113 lastBytesSent = m; 114 } else { 115 // looks like it's a concatenated stream: reset low-level zlib (or 116 // other engine) and buffers, then "resend" remaining input data 117 decompressor.reset(); 118 int leftoverOffset = lastBytesSent - nRemaining; 119 assert (leftoverOffset >= 0); 120 // this recopies userBuf -> direct buffer if using native libraries: 121 decompressor.setInput(buffer, leftoverOffset, nRemaining); 122 // NOTE: this is the one place we do NOT want to save the number 123 // of bytes sent (nRemaining here) into lastBytesSent: since we 124 // are resending what we've already sent before, offset is nonzero 125 // in general (only way it could be zero is if it already equals 126 // nRemaining), which would then screw up the offset calculation 127 // _next_ time around. IOW, getRemaining() is in terms of the 128 // original, zero-offset bufferload, so lastBytesSent must be as 129 // well. Cheesy ASCII art: 130 // 131 // <------------ m, lastBytesSent -----------> 132 // +===============================================+ 133 // buffer: |1111111111|22222222222222222|333333333333| | 134 // +===============================================+ 135 // #1: <-- off -->|<-------- nRemaining ---------> 136 // #2: <----------- off ----------->|<-- nRem. --> 137 // #3: (final substream: nRemaining == 0; eof = true) 138 // 139 // If lastBytesSent is anything other than m, as shown, then "off" 140 // will be calculated incorrectly. 141 } 142 } else if (decompressor.needsInput()) { 143 int m = getCompressedData(); 144 if (m == -1) { 145 throw new EOFException("Unexpected end of input stream"); 146 } 147 decompressor.setInput(buffer, 0, m); 148 lastBytesSent = m; 149 } 150 } 151 152 return n; 153 } 154 155 protected int getCompressedData() throws IOException { 156 checkStream(); 157 158 // note that the _caller_ is now required to call setInput() or throw 159 return in.read(buffer, 0, buffer.length); 160 } 161 162 protected void checkStream() throws IOException { 163 if (closed) { 164 throw new IOException("Stream closed"); 165 } 166 } 167 168 @Override 169 public void resetState() throws IOException { 170 decompressor.reset(); 171 } 172 173 private byte[] skipBytes = new byte[512]; 174 @Override 175 public long skip(long n) throws IOException { 176 // Sanity checks 177 if (n < 0) { 178 throw new IllegalArgumentException("negative skip length"); 179 } 180 checkStream(); 181 182 // Read 'n' bytes 183 int skipped = 0; 184 while (skipped < n) { 185 int len = Math.min(((int)n - skipped), skipBytes.length); 186 len = read(skipBytes, 0, len); 187 if (len == -1) { 188 eof = true; 189 break; 190 } 191 skipped += len; 192 } 193 return skipped; 194 } 195 196 @Override 197 public int available() throws IOException { 198 checkStream(); 199 return (eof) ? 0 : 1; 200 } 201 202 @Override 203 public void close() throws IOException { 204 if (!closed) { 205 try { 206 super.close(); 207 } finally { 208 closed = true; 209 } 210 } 211 } 212 213 @Override 214 public boolean markSupported() { 215 return false; 216 } 217 218 @Override 219 public synchronized void mark(int readlimit) { 220 } 221 222 @Override 223 public synchronized void reset() throws IOException { 224 throw new IOException("mark/reset not supported"); 225 } 226 227}