001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io.compress;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.OutputStream;
024
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
028import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
029import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
030import org.apache.hadoop.fs.CommonConfigurationKeys;
031import org.apache.hadoop.util.NativeCodeLoader;
032
033/**
034 * This class creates snappy compressors/decompressors.
035 */
036public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
037  Configuration conf;
038
039  /**
040   * Set the configuration to be used by this object.
041   *
042   * @param conf the configuration object.
043   */
044  @Override
045  public void setConf(Configuration conf) {
046    this.conf = conf;
047  }
048
049  /**
050   * Return the configuration used by this object.
051   *
052   * @return the configuration object used by this objec.
053   */
054  @Override
055  public Configuration getConf() {
056    return conf;
057  }
058
059  /**
060   * Are the native snappy libraries loaded & initialized?
061   */
062  public static void checkNativeCodeLoaded() {
063    if (!NativeCodeLoader.buildSupportsSnappy()) {
064      throw new RuntimeException("native snappy library not available: " +
065          "this version of libhadoop was built without " +
066          "snappy support.");
067    }
068    if (!NativeCodeLoader.isNativeCodeLoaded()) {
069      throw new RuntimeException("Failed to load libhadoop.");
070    }
071    if (!SnappyCompressor.isNativeCodeLoaded()) {
072      throw new RuntimeException("native snappy library not available: " +
073          "SnappyCompressor has not been loaded.");
074    }
075    if (!SnappyDecompressor.isNativeCodeLoaded()) {
076      throw new RuntimeException("native snappy library not available: " +
077          "SnappyDecompressor has not been loaded.");
078    }
079  }
080  
081  public static boolean isNativeCodeLoaded() {
082    return SnappyCompressor.isNativeCodeLoaded() && 
083        SnappyDecompressor.isNativeCodeLoaded();
084  }
085
086  public static String getLibraryName() {
087    return SnappyCompressor.getLibraryName();
088  }
089
090  /**
091   * Create a {@link CompressionOutputStream} that will write to the given
092   * {@link OutputStream}.
093   *
094   * @param out the location for the final output stream
095   * @return a stream the user can write uncompressed data to have it compressed
096   * @throws IOException
097   */
098  @Override
099  public CompressionOutputStream createOutputStream(OutputStream out)
100      throws IOException {
101    return CompressionCodec.Util.
102        createOutputStreamWithCodecPool(this, conf, out);
103  }
104
105  /**
106   * Create a {@link CompressionOutputStream} that will write to the given
107   * {@link OutputStream} with the given {@link Compressor}.
108   *
109   * @param out        the location for the final output stream
110   * @param compressor compressor to use
111   * @return a stream the user can write uncompressed data to have it compressed
112   * @throws IOException
113   */
114  @Override
115  public CompressionOutputStream createOutputStream(OutputStream out,
116                                                    Compressor compressor)
117      throws IOException {
118    checkNativeCodeLoaded();
119    int bufferSize = conf.getInt(
120        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
121        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
122
123    int compressionOverhead = (bufferSize / 6) + 32;
124
125    return new BlockCompressorStream(out, compressor, bufferSize,
126        compressionOverhead);
127  }
128
129  /**
130   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
131   *
132   * @return the type of compressor needed by this codec.
133   */
134  @Override
135  public Class<? extends Compressor> getCompressorType() {
136    checkNativeCodeLoaded();
137    return SnappyCompressor.class;
138  }
139
140  /**
141   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
142   *
143   * @return a new compressor for use by this codec
144   */
145  @Override
146  public Compressor createCompressor() {
147    checkNativeCodeLoaded();
148    int bufferSize = conf.getInt(
149        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
150        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
151    return new SnappyCompressor(bufferSize);
152  }
153
154  /**
155   * Create a {@link CompressionInputStream} that will read from the given
156   * input stream.
157   *
158   * @param in the stream to read compressed bytes from
159   * @return a stream to read uncompressed bytes from
160   * @throws IOException
161   */
162  @Override
163  public CompressionInputStream createInputStream(InputStream in)
164      throws IOException {
165    return CompressionCodec.Util.
166        createInputStreamWithCodecPool(this, conf, in);
167  }
168
169  /**
170   * Create a {@link CompressionInputStream} that will read from the given
171   * {@link InputStream} with the given {@link Decompressor}.
172   *
173   * @param in           the stream to read compressed bytes from
174   * @param decompressor decompressor to use
175   * @return a stream to read uncompressed bytes from
176   * @throws IOException
177   */
178  @Override
179  public CompressionInputStream createInputStream(InputStream in,
180                                                  Decompressor decompressor)
181      throws IOException {
182    checkNativeCodeLoaded();
183    return new BlockDecompressorStream(in, decompressor, conf.getInt(
184        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
185        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
186  }
187
188  /**
189   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
190   *
191   * @return the type of decompressor needed by this codec.
192   */
193  @Override
194  public Class<? extends Decompressor> getDecompressorType() {
195    checkNativeCodeLoaded();
196    return SnappyDecompressor.class;
197  }
198
199  /**
200   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
201   *
202   * @return a new decompressor for use by this codec
203   */
204  @Override
205  public Decompressor createDecompressor() {
206    checkNativeCodeLoaded();
207    int bufferSize = conf.getInt(
208        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
209        CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
210    return new SnappyDecompressor(bufferSize);
211  }
212  
213  /**
214   * {@inheritDoc}
215   */
216  @Override
217  public DirectDecompressor createDirectDecompressor() {
218    return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
219  }
220
221  /**
222   * Get the default filename extension for this kind of compression.
223   *
224   * @return <code>.snappy</code>.
225   */
226  @Override
227  public String getDefaultExtension() {
228    return ".snappy";
229  }
230}