001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.OutputStream; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026 027/** 028 * A {@link org.apache.hadoop.io.compress.CompressorStream} which works 029 * with 'block-based' based compression algorithms, as opposed to 030 * 'stream-based' compression algorithms. 031 * 032 * It should be noted that this wrapper does not guarantee that blocks will 033 * be sized for the compressor. If the 034 * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to 035 * effect meaningful compression, it is responsible for it. 036 */ 037@InterfaceAudience.Public 038@InterfaceStability.Evolving 039public class BlockCompressorStream extends CompressorStream { 040 041 // The 'maximum' size of input data to be compressed, to account 042 // for the overhead of the compression algorithm. 043 private final int MAX_INPUT_SIZE; 044 045 /** 046 * Create a {@link BlockCompressorStream}. 047 * 048 * @param out stream 049 * @param compressor compressor to be used 050 * @param bufferSize size of buffer 051 * @param compressionOverhead maximum 'overhead' of the compression 052 * algorithm with given bufferSize 053 */ 054 public BlockCompressorStream(OutputStream out, Compressor compressor, 055 int bufferSize, int compressionOverhead) { 056 super(out, compressor, bufferSize); 057 MAX_INPUT_SIZE = bufferSize - compressionOverhead; 058 } 059 060 /** 061 * Create a {@link BlockCompressorStream} with given output-stream and 062 * compressor. 063 * Use default of 512 as bufferSize and compressionOverhead of 064 * (1% of bufferSize + 12 bytes) = 18 bytes (zlib algorithm). 065 * 066 * @param out stream 067 * @param compressor compressor to be used 068 */ 069 public BlockCompressorStream(OutputStream out, Compressor compressor) { 070 this(out, compressor, 512, 18); 071 } 072 073 /** 074 * Write the data provided to the compression codec, compressing no more 075 * than the buffer size less the compression overhead as specified during 076 * construction for each block. 077 * 078 * Each block contains the uncompressed length for the block, followed by 079 * one or more length-prefixed blocks of compressed data. 080 */ 081 @Override 082 public void write(byte[] b, int off, int len) throws IOException { 083 // Sanity checks 084 if (compressor.finished()) { 085 throw new IOException("write beyond end of stream"); 086 } 087 if (b == null) { 088 throw new NullPointerException(); 089 } else if ((off < 0) || (off > b.length) || (len < 0) || 090 ((off + len) > b.length)) { 091 throw new IndexOutOfBoundsException(); 092 } else if (len == 0) { 093 return; 094 } 095 096 long limlen = compressor.getBytesRead(); 097 if (len + limlen > MAX_INPUT_SIZE && limlen > 0) { 098 // Adding this segment would exceed the maximum size. 099 // Flush data if we have it. 100 finish(); 101 compressor.reset(); 102 } 103 104 if (len > MAX_INPUT_SIZE) { 105 // The data we're given exceeds the maximum size. Any data 106 // we had have been flushed, so we write out this chunk in segments 107 // not exceeding the maximum size until it is exhausted. 108 rawWriteInt(len); 109 do { 110 int bufLen = Math.min(len, MAX_INPUT_SIZE); 111 112 compressor.setInput(b, off, bufLen); 113 compressor.finish(); 114 while (!compressor.finished()) { 115 compress(); 116 } 117 compressor.reset(); 118 off += bufLen; 119 len -= bufLen; 120 } while (len > 0); 121 return; 122 } 123 124 // Give data to the compressor 125 compressor.setInput(b, off, len); 126 if (!compressor.needsInput()) { 127 // compressor buffer size might be smaller than the maximum 128 // size, so we permit it to flush if required. 129 rawWriteInt((int)compressor.getBytesRead()); 130 do { 131 compress(); 132 } while (!compressor.needsInput()); 133 } 134 } 135 136 @Override 137 public void finish() throws IOException { 138 if (!compressor.finished()) { 139 rawWriteInt((int)compressor.getBytesRead()); 140 compressor.finish(); 141 while (!compressor.finished()) { 142 compress(); 143 } 144 } 145 } 146 147 @Override 148 protected void compress() throws IOException { 149 int len = compressor.compress(buffer, 0, buffer.length); 150 if (len > 0) { 151 // Write out the compressed chunk 152 rawWriteInt(len); 153 out.write(buffer, 0, len); 154 } 155 } 156 157 private void rawWriteInt(int v) throws IOException { 158 out.write((v >>> 24) & 0xFF); 159 out.write((v >>> 16) & 0xFF); 160 out.write((v >>> 8) & 0xFF); 161 out.write((v >>> 0) & 0xFF); 162 } 163 164}