001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
028import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
029import org.apache.hadoop.fs.CommonConfigurationKeys;
030import org.apache.hadoop.util.NativeCodeLoader;
031
032/**
033 * This class creates lz4 compressors/decompressors.
034 */
035public class Lz4Codec implements Configurable, CompressionCodec {
036
037  static {
038    NativeCodeLoader.isNativeCodeLoaded();
039  }
040
041  Configuration conf;
042
043  /**
044   * Set the configuration to be used by this object.
045   *
046   * @param conf the configuration object.
047   */
048  @Override
049  public void setConf(Configuration conf) {
050    this.conf = conf;
051  }
052
053  /**
054   * Return the configuration used by this object.
055   *
056   * @return the configuration object used by this objec.
057   */
058  @Override
059  public Configuration getConf() {
060    return conf;
061  }
062
063  /**
064   * Are the native lz4 libraries loaded & initialized?
065   *
066   * @return true if loaded & initialized, otherwise false
067   */
068  public static boolean isNativeCodeLoaded() {
069    return NativeCodeLoader.isNativeCodeLoaded();
070  }
071
072  public static String getLibraryName() {
073    return Lz4Compressor.getLibraryName();
074  }
075
076  /**
077   * Create a {@link CompressionOutputStream} that will write to the given
078   * {@link OutputStream}.
079   *
080   * @param out the location for the final output stream
081   * @return a stream the user can write uncompressed data to have it compressed
082   * @throws IOException
083   */
084  @Override
085  public CompressionOutputStream createOutputStream(OutputStream out)
086      throws IOException {
087    return CompressionCodec.Util.
088        createOutputStreamWithCodecPool(this, conf, out);
089  }
090
091  /**
092   * Create a {@link CompressionOutputStream} that will write to the given
093   * {@link OutputStream} with the given {@link Compressor}.
094   *
095   * @param out        the location for the final output stream
096   * @param compressor compressor to use
097   * @return a stream the user can write uncompressed data to have it compressed
098   * @throws IOException
099   */
100  @Override
101  public CompressionOutputStream createOutputStream(OutputStream out,
102                                                    Compressor compressor)
103      throws IOException {
104    if (!isNativeCodeLoaded()) {
105      throw new RuntimeException("native lz4 library not available");
106    }
107    int bufferSize = conf.getInt(
108        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
109        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
110
111    int compressionOverhead = bufferSize/255 + 16;
112
113    return new BlockCompressorStream(out, compressor, bufferSize,
114        compressionOverhead);
115  }
116
117  /**
118   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
119   *
120   * @return the type of compressor needed by this codec.
121   */
122  @Override
123  public Class<? extends Compressor> getCompressorType() {
124    if (!isNativeCodeLoaded()) {
125      throw new RuntimeException("native lz4 library not available");
126    }
127
128    return Lz4Compressor.class;
129  }
130
131  /**
132   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
133   *
134   * @return a new compressor for use by this codec
135   */
136  @Override
137  public Compressor createCompressor() {
138    if (!isNativeCodeLoaded()) {
139      throw new RuntimeException("native lz4 library not available");
140    }
141    int bufferSize = conf.getInt(
142        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
143        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
144    boolean useLz4HC = conf.getBoolean(
145        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
146        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_DEFAULT);
147    return new Lz4Compressor(bufferSize, useLz4HC);
148  }
149
150  /**
151   * Create a {@link CompressionInputStream} that will read from the given
152   * input stream.
153   *
154   * @param in the stream to read compressed bytes from
155   * @return a stream to read uncompressed bytes from
156   * @throws IOException
157   */
158  @Override
159  public CompressionInputStream createInputStream(InputStream in)
160      throws IOException {
161    return CompressionCodec.Util.
162        createInputStreamWithCodecPool(this, conf, in);
163  }
164
165  /**
166   * Create a {@link CompressionInputStream} that will read from the given
167   * {@link InputStream} with the given {@link Decompressor}.
168   *
169   * @param in           the stream to read compressed bytes from
170   * @param decompressor decompressor to use
171   * @return a stream to read uncompressed bytes from
172   * @throws IOException
173   */
174  @Override
175  public CompressionInputStream createInputStream(InputStream in,
176                                                  Decompressor decompressor)
177      throws IOException {
178    if (!isNativeCodeLoaded()) {
179      throw new RuntimeException("native lz4 library not available");
180    }
181
182    return new BlockDecompressorStream(in, decompressor, conf.getInt(
183        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
184        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
185  }
186
187  /**
188   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
189   *
190   * @return the type of decompressor needed by this codec.
191   */
192  @Override
193  public Class<? extends Decompressor> getDecompressorType() {
194    if (!isNativeCodeLoaded()) {
195      throw new RuntimeException("native lz4 library not available");
196    }
197
198    return Lz4Decompressor.class;
199  }
200
201  /**
202   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
203   *
204   * @return a new decompressor for use by this codec
205   */
206  @Override
207  public Decompressor createDecompressor() {
208    if (!isNativeCodeLoaded()) {
209      throw new RuntimeException("native lz4 library not available");
210    }
211    int bufferSize = conf.getInt(
212        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
213        CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
214    return new Lz4Decompressor(bufferSize);
215  }
216
217  /**
218   * Get the default filename extension for this kind of compression.
219   *
220   * @return <code>.lz4</code>.
221   */
222  @Override
223  public String getDefaultExtension() {
224    return ".lz4";
225  }
226}