001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.OutputStream; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026 027/** 028 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works 029 * with 'block-based' based compression algorithms, as opposed to 030 * 'stream-based' compression algorithms. 031 * 032 * It should be noted that this wrapper does not guarantee that blocks will 033 * be sized for the compressor. If the 034 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to 035 * effect meaningful compression, it is responsible for it. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Evolving 039public class BlockCompressorStream extends CompressorStream { 040 041 // The 'maximum' size of input data to be compressed, to account 042 // for the overhead of the compression algorithm. 043 private final int MAX_INPUT_SIZE; 044 045 /** 046 * Create a {@link BlockCompressorStream}. 047 * 048 * @param out stream 049 * @param compressor compressor to be used 050 * @param bufferSize size of buffer 051 * @param compressionOverhead maximum 'overhead' of the compression 052 * algorithm with given bufferSize 053 */ 054 public BlockCompressorStream(OutputStream out, Compressor compressor, 055 int bufferSize, int compressionOverhead) { 056 super(out, compressor, bufferSize); 057 MAX_INPUT_SIZE = bufferSize - compressionOverhead; 058 } 059 060 /** 061 * Create a {@link BlockCompressorStream} with given output-stream and 062 * compressor. 063 * Use default of 512 as bufferSize and compressionOverhead of 064 * (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm). 065 * 066 * @param out stream 067 * @param compressor compressor to be used 068 */ 069 public BlockCompressorStream(OutputStream out, Compressor compressor) { 070 this(out, compressor, 512, 18); 071 } 072 073 /** 074 * Write the data provided to the compression codec, compressing no more 075 * than the buffer size less the compression overhead as specified during 076 * construction for each block. 077 * 078 * Each block contains the uncompressed length for the block, followed by 079 * one or more length-prefixed blocks of compressed data. 080 */ 081 public void write(byte[] b, int off, int len) throws IOException { 082 // Sanity checks 083 if (compressor.finished()) { 084 throw new IOException("write beyond end of stream"); 085 } 086 if (b == null) { 087 throw new NullPointerException(); 088 } else if ((off < 0) || (off > b.length) || (len < 0) || 089 ((off + len) > b.length)) { 090 throw new IndexOutOfBoundsException(); 091 } else if (len == 0) { 092 return; 093 } 094 095 long limlen = compressor.getBytesRead(); 096 if (len + limlen > MAX_INPUT_SIZE && limlen > 0) { 097 // Adding this segment would exceed the maximum size. 098 // Flush data if we have it. 099 finish(); 100 compressor.reset(); 101 } 102 103 if (len > MAX_INPUT_SIZE) { 104 // The data we're given exceeds the maximum size. Any data 105 // we had have been flushed, so we write out this chunk in segments 106 // not exceeding the maximum size until it is exhausted. 107 rawWriteInt(len); 108 do { 109 int bufLen = Math.min(len, MAX_INPUT_SIZE); 110 111 compressor.setInput(b, off, bufLen); 112 compressor.finish(); 113 while (!compressor.finished()) { 114 compress(); 115 } 116 compressor.reset(); 117 off += bufLen; 118 len -= bufLen; 119 } while (len > 0); 120 return; 121 } 122 123 // Give data to the compressor 124 compressor.setInput(b, off, len); 125 if (!compressor.needsInput()) { 126 // compressor buffer size might be smaller than the maximum 127 // size, so we permit it to flush if required. 128 rawWriteInt((int)compressor.getBytesRead()); 129 do { 130 compress(); 131 } while (!compressor.needsInput()); 132 } 133 } 134 135 public void finish() throws IOException { 136 if (!compressor.finished()) { 137 rawWriteInt((int)compressor.getBytesRead()); 138 compressor.finish(); 139 while (!compressor.finished()) { 140 compress(); 141 } 142 } 143 } 144 145 protected void compress() throws IOException { 146 int len = compressor.compress(buffer, 0, buffer.length); 147 if (len > 0) { 148 // Write out the compressed chunk 149 rawWriteInt(len); 150 out.write(buffer, 0, len); 151 } 152 } 153 154 private void rawWriteInt(int v) throws IOException { 155 out.write((v >>> 24) & 0xFF); 156 out.write((v >>> 16) & 0xFF); 157 out.write((v >>> 8) & 0xFF); 158 out.write((v >>> 0) & 0xFF); 159 } 160 161}