001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.io.compress.lz4.Lz4Compressor; 028import org.apache.hadoop.io.compress.lz4.Lz4Decompressor; 029import org.apache.hadoop.fs.CommonConfigurationKeys; 030import org.apache.hadoop.util.NativeCodeLoader; 031 032/** 033 * This class creates lz4 compressors/decompressors. 034 */ 035public class Lz4Codec implements Configurable, CompressionCodec { 036 037 static { 038 NativeCodeLoader.isNativeCodeLoaded(); 039 } 040 041 Configuration conf; 042 043 /** 044 * Set the configuration to be used by this object. 045 * 046 * @param conf the configuration object. 047 */ 048 @Override 049 public void setConf(Configuration conf) { 050 this.conf = conf; 051 } 052 053 /** 054 * Return the configuration used by this object. 055 * 056 * @return the configuration object used by this objec. 057 */ 058 @Override 059 public Configuration getConf() { 060 return conf; 061 } 062 063 /** 064 * Are the native lz4 libraries loaded & initialized? 065 * 066 * @return true if loaded & initialized, otherwise false 067 */ 068 public static boolean isNativeCodeLoaded() { 069 return NativeCodeLoader.isNativeCodeLoaded(); 070 } 071 072 public static String getLibraryName() { 073 return Lz4Compressor.getLibraryName(); 074 } 075 076 /** 077 * Create a {@link CompressionOutputStream} that will write to the given 078 * {@link OutputStream}. 079 * 080 * @param out the location for the final output stream 081 * @return a stream the user can write uncompressed data to have it compressed 082 * @throws IOException 083 */ 084 @Override 085 public CompressionOutputStream createOutputStream(OutputStream out) 086 throws IOException { 087 return CompressionCodec.Util. 088 createOutputStreamWithCodecPool(this, conf, out); 089 } 090 091 /** 092 * Create a {@link CompressionOutputStream} that will write to the given 093 * {@link OutputStream} with the given {@link Compressor}. 094 * 095 * @param out the location for the final output stream 096 * @param compressor compressor to use 097 * @return a stream the user can write uncompressed data to have it compressed 098 * @throws IOException 099 */ 100 @Override 101 public CompressionOutputStream createOutputStream(OutputStream out, 102 Compressor compressor) 103 throws IOException { 104 if (!isNativeCodeLoaded()) { 105 throw new RuntimeException("native lz4 library not available"); 106 } 107 int bufferSize = conf.getInt( 108 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, 109 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); 110 111 int compressionOverhead = bufferSize/255 + 16; 112 113 return new BlockCompressorStream(out, compressor, bufferSize, 114 compressionOverhead); 115 } 116 117 /** 118 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 119 * 120 * @return the type of compressor needed by this codec. 121 */ 122 @Override 123 public Class<? extends Compressor> getCompressorType() { 124 if (!isNativeCodeLoaded()) { 125 throw new RuntimeException("native lz4 library not available"); 126 } 127 128 return Lz4Compressor.class; 129 } 130 131 /** 132 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 133 * 134 * @return a new compressor for use by this codec 135 */ 136 @Override 137 public Compressor createCompressor() { 138 if (!isNativeCodeLoaded()) { 139 throw new RuntimeException("native lz4 library not available"); 140 } 141 int bufferSize = conf.getInt( 142 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, 143 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); 144 boolean useLz4HC = conf.getBoolean( 145 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY, 146 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT); 147 return new Lz4Compressor(bufferSize, useLz4HC); 148 } 149 150 /** 151 * Create a {@link CompressionInputStream} that will read from the given 152 * input stream. 153 * 154 * @param in the stream to read compressed bytes from 155 * @return a stream to read uncompressed bytes from 156 * @throws IOException 157 */ 158 @Override 159 public CompressionInputStream createInputStream(InputStream in) 160 throws IOException { 161 return CompressionCodec.Util. 162 createInputStreamWithCodecPool(this, conf, in); 163 } 164 165 /** 166 * Create a {@link CompressionInputStream} that will read from the given 167 * {@link InputStream} with the given {@link Decompressor}. 168 * 169 * @param in the stream to read compressed bytes from 170 * @param decompressor decompressor to use 171 * @return a stream to read uncompressed bytes from 172 * @throws IOException 173 */ 174 @Override 175 public CompressionInputStream createInputStream(InputStream in, 176 Decompressor decompressor) 177 throws IOException { 178 if (!isNativeCodeLoaded()) { 179 throw new RuntimeException("native lz4 library not available"); 180 } 181 182 return new BlockDecompressorStream(in, decompressor, conf.getInt( 183 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, 184 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT)); 185 } 186 187 /** 188 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 189 * 190 * @return the type of decompressor needed by this codec. 191 */ 192 @Override 193 public Class<? extends Decompressor> getDecompressorType() { 194 if (!isNativeCodeLoaded()) { 195 throw new RuntimeException("native lz4 library not available"); 196 } 197 198 return Lz4Decompressor.class; 199 } 200 201 /** 202 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 203 * 204 * @return a new decompressor for use by this codec 205 */ 206 @Override 207 public Decompressor createDecompressor() { 208 if (!isNativeCodeLoaded()) { 209 throw new RuntimeException("native lz4 library not available"); 210 } 211 int bufferSize = conf.getInt( 212 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY, 213 CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT); 214 return new Lz4Decompressor(bufferSize); 215 } 216 217 /** 218 * Get the default filename extension for this kind of compression. 219 * 220 * @return <code>.lz4</code>. 221 */ 222 @Override 223 public String getDefaultExtension() { 224 return ".lz4"; 225 } 226}