001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.OutputStream; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.io.compress.CompressionOutputStream; 027import org.apache.hadoop.io.compress.Compressor; 028 029@InterfaceAudience.Public 030@InterfaceStability.Evolving 031public class CompressorStream extends CompressionOutputStream { 032 protected Compressor compressor; 033 protected byte[] buffer; 034 protected boolean closed = false; 035 036 public CompressorStream(OutputStream out, Compressor compressor, int bufferSize) { 037 super(out); 038 039 if (out == null || compressor == null) { 040 throw new NullPointerException(); 041 } else if (bufferSize <= 0) { 042 throw new IllegalArgumentException("Illegal bufferSize"); 043 } 044 045 this.compressor = compressor; 046 buffer = new byte[bufferSize]; 047 } 048 049 public CompressorStream(OutputStream out, Compressor compressor) { 050 this(out, compressor, 512); 051 } 052 053 /** 054 * Allow derived classes to directly set the underlying stream. 055 * 056 * @param out Underlying output stream. 057 */ 058 protected CompressorStream(OutputStream out) { 059 super(out); 060 } 061 062 @Override 063 public void write(byte[] b, int off, int len) throws IOException { 064 // Sanity checks 065 if (compressor.finished()) { 066 throw new IOException("write beyond end of stream"); 067 } 068 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 069 throw new IndexOutOfBoundsException(); 070 } else if (len == 0) { 071 return; 072 } 073 074 compressor.setInput(b, off, len); 075 while (!compressor.needsInput()) { 076 compress(); 077 } 078 } 079 080 protected void compress() throws IOException { 081 int len = compressor.compress(buffer, 0, buffer.length); 082 if (len > 0) { 083 out.write(buffer, 0, len); 084 } 085 } 086 087 @Override 088 public void finish() throws IOException { 089 if (!compressor.finished()) { 090 compressor.finish(); 091 while (!compressor.finished()) { 092 compress(); 093 } 094 } 095 } 096 097 @Override 098 public void resetState() throws IOException { 099 compressor.reset(); 100 } 101 102 @Override 103 public void close() throws IOException { 104 if (!closed) { 105 try { 106 super.close(); 107 } 108 finally { 109 closed = true; 110 } 111 } 112 } 113 114 private byte[] oneByte = new byte[1]; 115 @Override 116 public void write(int b) throws IOException { 117 oneByte[0] = (byte)(b & 0xff); 118 write(oneByte, 0, oneByte.length); 119 } 120 121}