001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io.compress; 020 021 import java.io.EOFException; 022 import java.io.IOException; 023 import java.io.InputStream; 024 025 import org.apache.hadoop.classification.InterfaceAudience; 026 import org.apache.hadoop.classification.InterfaceStability; 027 028 /** 029 * A {@link org.apache.hadoop.io.compress.DecompressorStream} which works 030 * with 'block-based' based compression algorithms, as opposed to 031 * 'stream-based' compression algorithms. 032 * 033 */ 034 @InterfaceAudience.Public 035 @InterfaceStability.Evolving 036 public class BlockDecompressorStream extends DecompressorStream { 037 private int originalBlockSize = 0; 038 private int noUncompressedBytes = 0; 039 040 /** 041 * Create a {@link BlockDecompressorStream}. 042 * 043 * @param in input stream 044 * @param decompressor decompressor to use 045 * @param bufferSize size of buffer 046 * @throws IOException 047 */ 048 public BlockDecompressorStream(InputStream in, Decompressor decompressor, 049 int bufferSize) throws IOException { 050 super(in, decompressor, bufferSize); 051 } 052 053 /** 054 * Create a {@link BlockDecompressorStream}. 055 * 056 * @param in input stream 057 * @param decompressor decompressor to use 058 * @throws IOException 059 */ 060 public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException { 061 super(in, decompressor); 062 } 063 064 protected BlockDecompressorStream(InputStream in) throws IOException { 065 super(in); 066 } 067 068 @Override 069 protected int decompress(byte[] b, int off, int len) throws IOException { 070 // Check if we are the beginning of a block 071 if (noUncompressedBytes == originalBlockSize) { 072 // Get original data size 073 try { 074 originalBlockSize = rawReadInt(); 075 } catch (IOException ioe) { 076 return -1; 077 } 078 noUncompressedBytes = 0; 079 // EOF if originalBlockSize is 0 080 // This will occur only when decompressing previous compressed empty file 081 if (originalBlockSize == 0) { 082 eof = true; 083 return -1; 084 } 085 } 086 087 int n = 0; 088 while ((n = decompressor.decompress(b, off, len)) == 0) { 089 if (decompressor.finished() || decompressor.needsDictionary()) { 090 if (noUncompressedBytes >= originalBlockSize) { 091 eof = true; 092 return -1; 093 } 094 } 095 if (decompressor.needsInput()) { 096 int m; 097 try { 098 m = getCompressedData(); 099 } catch (EOFException e) { 100 eof = true; 101 return -1; 102 } 103 // Send the read data to the decompressor 104 decompressor.setInput(buffer, 0, m); 105 } 106 } 107 108 // Note the no. of decompressed bytes read from 'current' block 109 noUncompressedBytes += n; 110 111 return n; 112 } 113 114 @Override 115 protected int getCompressedData() throws IOException { 116 checkStream(); 117 118 // Get the size of the compressed chunk (always non-negative) 119 int len = rawReadInt(); 120 121 // Read len bytes from underlying stream 122 if (len > buffer.length) { 123 buffer = new byte[len]; 124 } 125 int n = 0, off = 0; 126 while (n < len) { 127 int count = in.read(buffer, off + n, len - n); 128 if (count < 0) { 129 throw new EOFException("Unexpected end of block in input stream"); 130 } 131 n += count; 132 } 133 134 return len; 135 } 136 137 @Override 138 public void resetState() throws IOException { 139 originalBlockSize = 0; 140 noUncompressedBytes = 0; 141 super.resetState(); 142 } 143 144 private int rawReadInt() throws IOException { 145 int b1 = in.read(); 146 int b2 = in.read(); 147 int b3 = in.read(); 148 int b4 = in.read(); 149 if ((b1 | b2 | b3 | b4) < 0) 150 throw new EOFException(); 151 return ((b1 << 24) + (b2 << 16) + (b3 << 8) + (b4 << 0)); 152 } 153 }