001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.io.InputStream; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027 028/** 029 * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works 030 * with 'block-based' based compression algorithms, as opposed to 031 * 'stream-based' compression algorithms. 032 * 033 */ 034@InterfaceAudience.Public 035@InterfaceStability.Evolving 036public class BlockDecompressorStream extends DecompressorStream { 037 private int originalBlockSize = 0; 038 private int noUncompressedBytes = 0; 039 040 /** 041 * Create a {@link BlockDecompressorStream}. 042 * 043 * @param in input stream 044 * @param decompressor decompressor to use 045 * @param bufferSize size of buffer 046 * @throws IOException 047 */ 048 public BlockDecompressorStream(InputStream in, Decompressor decompressor, 049 int bufferSize) throws IOException { 050 super(in, decompressor, bufferSize); 051 } 052 053 /** 054 * Create a {@link BlockDecompressorStream}. 055 * 056 * @param in input stream 057 * @param decompressor decompressor to use 058 * @throws IOException 059 */ 060 public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException { 061 super(in, decompressor); 062 } 063 064 protected BlockDecompressorStream(InputStream in) throws IOException { 065 super(in); 066 } 067 068 protected int decompress(byte[] b, int off, int len) throws IOException { 069 // Check if we are the beginning of a block 070 if (noUncompressedBytes == originalBlockSize) { 071 // Get original data size 072 try { 073 originalBlockSize = rawReadInt(); 074 } catch (IOException ioe) { 075 return -1; 076 } 077 noUncompressedBytes = 0; 078 // EOF if originalBlockSize is 0 079 // This will occur only when decompressing previous compressed empty file 080 if (originalBlockSize == 0) { 081 eof = true; 082 return -1; 083 } 084 } 085 086 int n = 0; 087 while ((n = decompressor.decompress(b, off, len)) == 0) { 088 if (decompressor.finished() || decompressor.needsDictionary()) { 089 if (noUncompressedBytes >= originalBlockSize) { 090 eof = true; 091 return -1; 092 } 093 } 094 if (decompressor.needsInput()) { 095 int m = getCompressedData(); 096 // Send the read data to the decompressor 097 decompressor.setInput(buffer, 0, m); 098 } 099 } 100 101 // Note the no. of decompressed bytes read from 'current' block 102 noUncompressedBytes += n; 103 104 return n; 105 } 106 107 protected int getCompressedData() throws IOException { 108 checkStream(); 109 110 // Get the size of the compressed chunk (always non-negative) 111 int len = rawReadInt(); 112 113 // Read len bytes from underlying stream 114 if (len > buffer.length) { 115 buffer = new byte[len]; 116 } 117 int n = 0, off = 0; 118 while (n < len) { 119 int count = in.read(buffer, off + n, len - n); 120 if (count < 0) { 121 throw new EOFException("Unexpected end of block in input stream"); 122 } 123 n += count; 124 } 125 126 return len; 127 } 128 129 public void resetState() throws IOException { 130 originalBlockSize = 0; 131 noUncompressedBytes = 0; 132 super.resetState(); 133 } 134 135 private int rawReadInt() throws IOException { 136 int b1 = in.read(); 137 int b2 = in.read(); 138 int b3 = in.read(); 139 int b4 = in.read(); 140 if ((b1 | b2 | b3 | b4) < 0) 141 throw new EOFException(); 142 return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); 143 } 144}