001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.OutputStream; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.io.compress.CompressionOutputStream; 027import org.apache.hadoop.io.compress.Compressor; 028 029@InterfaceAudience.Public 030@InterfaceStability.Evolving 031public class CompressorStream extends CompressionOutputStream { 032 protected Compressor compressor; 033 protected byte[] buffer; 034 protected boolean closed = false; 035 036 public CompressorStream(OutputStream out, Compressor compressor, int bufferSize) { 037 super(out); 038 039 if (out == null || compressor == null) { 040 throw new NullPointerException(); 041 } else if (bufferSize <= 0) { 042 throw new IllegalArgumentException("Illegal bufferSize"); 043 } 044 045 this.compressor = compressor; 046 buffer = new byte[bufferSize]; 047 } 048 049 public CompressorStream(OutputStream out, Compressor compressor) { 050 this(out, compressor, 512); 051 } 052 053 /** 054 * Allow derived classes to directly set the underlying stream. 055 * 056 * @param out Underlying output stream. 057 */ 058 protected CompressorStream(OutputStream out) { 059 super(out); 060 } 061 062 public void write(byte[] b, int off, int len) throws IOException { 063 // Sanity checks 064 if (compressor.finished()) { 065 throw new IOException("write beyond end of stream"); 066 } 067 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 068 throw new IndexOutOfBoundsException(); 069 } else if (len == 0) { 070 return; 071 } 072 073 compressor.setInput(b, off, len); 074 while (!compressor.needsInput()) { 075 compress(); 076 } 077 } 078 079 protected void compress() throws IOException { 080 int len = compressor.compress(buffer, 0, buffer.length); 081 if (len > 0) { 082 out.write(buffer, 0, len); 083 } 084 } 085 086 public void finish() throws IOException { 087 if (!compressor.finished()) { 088 compressor.finish(); 089 while (!compressor.finished()) { 090 compress(); 091 } 092 } 093 } 094 095 public void resetState() throws IOException { 096 compressor.reset(); 097 } 098 099 public void close() throws IOException { 100 if (!closed) { 101 finish(); 102 out.close(); 103 closed = true; 104 } 105 } 106 107 private byte[] oneByte = new byte[1]; 108 public void write(int b) throws IOException { 109 oneByte[0] = (byte)(b & 0xff); 110 write(oneByte, 0, oneByte.length); 111 } 112 113}