001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.OutputStream; 024 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.io.compress.snappy.SnappyCompressor; 028import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; 029import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; 030import org.apache.hadoop.fs.CommonConfigurationKeys; 031import org.apache.hadoop.util.NativeCodeLoader; 032 033/** 034 * This class creates snappy compressors/decompressors. 035 */ 036public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec { 037 Configuration conf; 038 039 /** 040 * Set the configuration to be used by this object. 041 * 042 * @param conf the configuration object. 043 */ 044 @Override 045 public void setConf(Configuration conf) { 046 this.conf = conf; 047 } 048 049 /** 050 * Return the configuration used by this object. 051 * 052 * @return the configuration object used by this objec. 053 */ 054 @Override 055 public Configuration getConf() { 056 return conf; 057 } 058 059 /** 060 * Are the native snappy libraries loaded & initialized? 061 */ 062 public static void checkNativeCodeLoaded() { 063 if (!NativeCodeLoader.buildSupportsSnappy()) { 064 throw new RuntimeException("native snappy library not available: " + 065 "this version of libhadoop was built without " + 066 "snappy support."); 067 } 068 if (!NativeCodeLoader.isNativeCodeLoaded()) { 069 throw new RuntimeException("Failed to load libhadoop."); 070 } 071 if (!SnappyCompressor.isNativeCodeLoaded()) { 072 throw new RuntimeException("native snappy library not available: " + 073 "SnappyCompressor has not been loaded."); 074 } 075 if (!SnappyDecompressor.isNativeCodeLoaded()) { 076 throw new RuntimeException("native snappy library not available: " + 077 "SnappyDecompressor has not been loaded."); 078 } 079 } 080 081 public static boolean isNativeCodeLoaded() { 082 return SnappyCompressor.isNativeCodeLoaded() && 083 SnappyDecompressor.isNativeCodeLoaded(); 084 } 085 086 public static String getLibraryName() { 087 return SnappyCompressor.getLibraryName(); 088 } 089 090 /** 091 * Create a {@link CompressionOutputStream} that will write to the given 092 * {@link OutputStream}. 093 * 094 * @param out the location for the final output stream 095 * @return a stream the user can write uncompressed data to have it compressed 096 * @throws IOException 097 */ 098 @Override 099 public CompressionOutputStream createOutputStream(OutputStream out) 100 throws IOException { 101 return CompressionCodec.Util. 102 createOutputStreamWithCodecPool(this, conf, out); 103 } 104 105 /** 106 * Create a {@link CompressionOutputStream} that will write to the given 107 * {@link OutputStream} with the given {@link Compressor}. 108 * 109 * @param out the location for the final output stream 110 * @param compressor compressor to use 111 * @return a stream the user can write uncompressed data to have it compressed 112 * @throws IOException 113 */ 114 @Override 115 public CompressionOutputStream createOutputStream(OutputStream out, 116 Compressor compressor) 117 throws IOException { 118 checkNativeCodeLoaded(); 119 int bufferSize = conf.getInt( 120 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, 121 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); 122 123 int compressionOverhead = (bufferSize / 6) + 32; 124 125 return new BlockCompressorStream(out, compressor, bufferSize, 126 compressionOverhead); 127 } 128 129 /** 130 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 131 * 132 * @return the type of compressor needed by this codec. 133 */ 134 @Override 135 public Class<? extends Compressor> getCompressorType() { 136 checkNativeCodeLoaded(); 137 return SnappyCompressor.class; 138 } 139 140 /** 141 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 142 * 143 * @return a new compressor for use by this codec 144 */ 145 @Override 146 public Compressor createCompressor() { 147 checkNativeCodeLoaded(); 148 int bufferSize = conf.getInt( 149 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, 150 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); 151 return new SnappyCompressor(bufferSize); 152 } 153 154 /** 155 * Create a {@link CompressionInputStream} that will read from the given 156 * input stream. 157 * 158 * @param in the stream to read compressed bytes from 159 * @return a stream to read uncompressed bytes from 160 * @throws IOException 161 */ 162 @Override 163 public CompressionInputStream createInputStream(InputStream in) 164 throws IOException { 165 return CompressionCodec.Util. 166 createInputStreamWithCodecPool(this, conf, in); 167 } 168 169 /** 170 * Create a {@link CompressionInputStream} that will read from the given 171 * {@link InputStream} with the given {@link Decompressor}. 172 * 173 * @param in the stream to read compressed bytes from 174 * @param decompressor decompressor to use 175 * @return a stream to read uncompressed bytes from 176 * @throws IOException 177 */ 178 @Override 179 public CompressionInputStream createInputStream(InputStream in, 180 Decompressor decompressor) 181 throws IOException { 182 checkNativeCodeLoaded(); 183 return new BlockDecompressorStream(in, decompressor, conf.getInt( 184 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, 185 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT)); 186 } 187 188 /** 189 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 190 * 191 * @return the type of decompressor needed by this codec. 192 */ 193 @Override 194 public Class<? extends Decompressor> getDecompressorType() { 195 checkNativeCodeLoaded(); 196 return SnappyDecompressor.class; 197 } 198 199 /** 200 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 201 * 202 * @return a new decompressor for use by this codec 203 */ 204 @Override 205 public Decompressor createDecompressor() { 206 checkNativeCodeLoaded(); 207 int bufferSize = conf.getInt( 208 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY, 209 CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT); 210 return new SnappyDecompressor(bufferSize); 211 } 212 213 /** 214 * {@inheritDoc} 215 */ 216 @Override 217 public DirectDecompressor createDirectDecompressor() { 218 return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null; 219 } 220 221 /** 222 * Get the default filename extension for this kind of compression. 223 * 224 * @return <code>.snappy</code>. 225 */ 226 @Override 227 public String getDefaultExtension() { 228 return ".snappy"; 229 } 230}