001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.BufferedInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025
026 import org.apache.hadoop.conf.Configurable;
027 import org.apache.hadoop.conf.Configuration;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.fs.Seekable;
032 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
036
037 /**
038 * This class provides output and input streams for bzip2 compression
039 * and decompression. It uses the native bzip2 library on the system
040 * if possible, else it uses a pure-Java implementation of the bzip2
041 * algorithm. The configuration parameter
042 * io.compression.codec.bzip2.library can be used to control this
043 * behavior.
044 *
045 * In the pure-Java mode, the Compressor and Decompressor interfaces
046 * are not implemented. Therefore, in that mode, those methods of
047 * CompressionCodec which have a Compressor or Decompressor type
048 * argument, throw UnsupportedOperationException.
049 *
050 * Currently, support for splittability is available only in the
051 * pure-Java mode; therefore, if a SplitCompressionInputStream is
052 * requested, the pure-Java implementation is used, regardless of the
053 * setting of the configuration parameter mentioned above.
054 */
055 @InterfaceAudience.Public
056 @InterfaceStability.Evolving
057 public class BZip2Codec implements Configurable, SplittableCompressionCodec {
058
059 private static final String HEADER = "BZ";
060 private static final int HEADER_LEN = HEADER.length();
061 private static final String SUB_HEADER = "h9";
062 private static final int SUB_HEADER_LEN = SUB_HEADER.length();
063
064 private Configuration conf;
065
066 /**
067 * Set the configuration to be used by this object.
068 *
069 * @param conf the configuration object.
070 */
071 @Override
072 public void setConf(Configuration conf) {
073 this.conf = conf;
074 }
075
076 /**
077 * Return the configuration used by this object.
078 *
079 * @return the configuration object used by this objec.
080 */
081 @Override
082 public Configuration getConf() {
083 return conf;
084 }
085
086 /**
087 * Creates a new instance of BZip2Codec.
088 */
089 public BZip2Codec() { }
090
091 /**
092 * Create a {@link CompressionOutputStream} that will write to the given
093 * {@link OutputStream}.
094 *
095 * @param out the location for the final output stream
096 * @return a stream the user can write uncompressed data to, to have it
097 * compressed
098 * @throws IOException
099 */
100 @Override
101 public CompressionOutputStream createOutputStream(OutputStream out)
102 throws IOException {
103 return createOutputStream(out, createCompressor());
104 }
105
106 /**
107 * Create a {@link CompressionOutputStream} that will write to the given
108 * {@link OutputStream} with the given {@link Compressor}.
109 *
110 * @param out the location for the final output stream
111 * @param compressor compressor to use
112 * @return a stream the user can write uncompressed data to, to have it
113 * compressed
114 * @throws IOException
115 */
116 @Override
117 public CompressionOutputStream createOutputStream(OutputStream out,
118 Compressor compressor) throws IOException {
119 return Bzip2Factory.isNativeBzip2Loaded(conf) ?
120 new CompressorStream(out, compressor,
121 conf.getInt("io.file.buffer.size", 4*1024)) :
122 new BZip2CompressionOutputStream(out);
123 }
124
125 /**
126 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
127 *
128 * @return the type of compressor needed by this codec.
129 */
130 @Override
131 public Class<? extends Compressor> getCompressorType() {
132 return Bzip2Factory.getBzip2CompressorType(conf);
133 }
134
135 /**
136 * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
137 *
138 * @return a new compressor for use by this codec
139 */
140 @Override
141 public Compressor createCompressor() {
142 return Bzip2Factory.getBzip2Compressor(conf);
143 }
144
145 /**
146 * Create a {@link CompressionInputStream} that will read from the given
147 * input stream and return a stream for uncompressed data.
148 *
149 * @param in the stream to read compressed bytes from
150 * @return a stream to read uncompressed bytes from
151 * @throws IOException
152 */
153 @Override
154 public CompressionInputStream createInputStream(InputStream in)
155 throws IOException {
156 return createInputStream(in, createDecompressor());
157 }
158
159 /**
160 * Create a {@link CompressionInputStream} that will read from the given
161 * {@link InputStream} with the given {@link Decompressor}, and return a
162 * stream for uncompressed data.
163 *
164 * @param in the stream to read compressed bytes from
165 * @param decompressor decompressor to use
166 * @return a stream to read uncompressed bytes from
167 * @throws IOException
168 */
169 @Override
170 public CompressionInputStream createInputStream(InputStream in,
171 Decompressor decompressor) throws IOException {
172 return Bzip2Factory.isNativeBzip2Loaded(conf) ?
173 new DecompressorStream(in, decompressor,
174 conf.getInt("io.file.buffer.size", 4*1024)) :
175 new BZip2CompressionInputStream(in);
176 }
177
178 /**
179 * Creates CompressionInputStream to be used to read off uncompressed data
180 * in one of the two reading modes. i.e. Continuous or Blocked reading modes
181 *
182 * @param seekableIn The InputStream
183 * @param start The start offset into the compressed stream
184 * @param end The end offset into the compressed stream
185 * @param readMode Controls whether progress is reported continuously or
186 * only at block boundaries.
187 *
188 * @return CompressionInputStream for BZip2 aligned at block boundaries
189 */
190 public SplitCompressionInputStream createInputStream(InputStream seekableIn,
191 Decompressor decompressor, long start, long end, READ_MODE readMode)
192 throws IOException {
193
194 if (!(seekableIn instanceof Seekable)) {
195 throw new IOException("seekableIn must be an instance of " +
196 Seekable.class.getName());
197 }
198
199 //find the position of first BZip2 start up marker
200 ((Seekable)seekableIn).seek(0);
201
202 // BZip2 start of block markers are of 6 bytes. But the very first block
203 // also has "BZh9", making it 10 bytes. This is the common case. But at
204 // time stream might start without a leading BZ.
205 final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
206 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
207 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
208
209 ((Seekable)seekableIn).seek(adjStart);
210 SplitCompressionInputStream in =
211 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
212
213
214 // The following if clause handles the following case:
215 // Assume the following scenario in BZip2 compressed stream where
216 // . represent compressed data.
217 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
218 // ........................[47 bits][1 bit].....[48 bit Block]...
219 // ................................^[Assume a Byte alignment here]
220 // ........................................^^[current position of stream]
221 // .....................^^[We go back 10 Bytes in stream and find a Block marker]
222 // ........................................^^[We align at wrong position!]
223 // ...........................................................^^[While this pos is correct]
224
225 if (in.getPos() <= start) {
226 ((Seekable)seekableIn).seek(start);
227 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
228 }
229
230 return in;
231 }
232
233 /**
234 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
235 *
236 * @return the type of decompressor needed by this codec.
237 */
238 @Override
239 public Class<? extends Decompressor> getDecompressorType() {
240 return Bzip2Factory.getBzip2DecompressorType(conf);
241 }
242
243 /**
244 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
245 *
246 * @return a new decompressor for use by this codec
247 */
248 @Override
249 public Decompressor createDecompressor() {
250 return Bzip2Factory.getBzip2Decompressor(conf);
251 }
252
253 /**
254 * .bz2 is recognized as the default extension for compressed BZip2 files
255 *
256 * @return A String telling the default bzip2 file extension
257 */
258 @Override
259 public String getDefaultExtension() {
260 return ".bz2";
261 }
262
263 private static class BZip2CompressionOutputStream extends
264 CompressionOutputStream {
265
266 // class data starts here//
267 private CBZip2OutputStream output;
268 private boolean needsReset;
269 // class data ends here//
270
271 public BZip2CompressionOutputStream(OutputStream out)
272 throws IOException {
273 super(out);
274 needsReset = true;
275 }
276
277 private void writeStreamHeader() throws IOException {
278 if (super.out != null) {
279 // The compressed bzip2 stream should start with the
280 // identifying characters BZ. Caller of CBZip2OutputStream
281 // i.e. this class must write these characters.
282 out.write(HEADER.getBytes());
283 }
284 }
285
286 public void finish() throws IOException {
287 if (needsReset) {
288 // In the case that nothing is written to this stream, we still need to
289 // write out the header before closing, otherwise the stream won't be
290 // recognized by BZip2CompressionInputStream.
291 internalReset();
292 }
293 this.output.finish();
294 needsReset = true;
295 }
296
297 private void internalReset() throws IOException {
298 if (needsReset) {
299 needsReset = false;
300 writeStreamHeader();
301 this.output = new CBZip2OutputStream(out);
302 }
303 }
304
305 public void resetState() throws IOException {
306 // Cannot write to out at this point because out might not be ready
307 // yet, as in SequenceFile.Writer implementation.
308 needsReset = true;
309 }
310
311 public void write(int b) throws IOException {
312 if (needsReset) {
313 internalReset();
314 }
315 this.output.write(b);
316 }
317
318 public void write(byte[] b, int off, int len) throws IOException {
319 if (needsReset) {
320 internalReset();
321 }
322 this.output.write(b, off, len);
323 }
324
325 public void close() throws IOException {
326 if (needsReset) {
327 // In the case that nothing is written to this stream, we still need to
328 // write out the header before closing, otherwise the stream won't be
329 // recognized by BZip2CompressionInputStream.
330 internalReset();
331 }
332 this.output.flush();
333 this.output.close();
334 needsReset = true;
335 }
336
337 }// end of class BZip2CompressionOutputStream
338
339 /**
340 * This class is capable to de-compress BZip2 data in two modes;
341 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to
342 * do decompression starting any arbitrary position in the stream.
343 *
344 * So this facility can easily be used to parallelize decompression
345 * of a large BZip2 file for performance reasons. (It is exactly
346 * done so for Hadoop framework. See LineRecordReader for an
347 * example). So one can break the file (of course logically) into
348 * chunks for parallel processing. These "splits" should be like
349 * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
350 * So this code is designed and tested for FileInputFormat's way
351 * of splitting only.
352 */
353
354 private static class BZip2CompressionInputStream extends
355 SplitCompressionInputStream {
356
357 // class data starts here//
358 private CBZip2InputStream input;
359 boolean needsReset;
360 private BufferedInputStream bufferedIn;
361 private boolean isHeaderStripped = false;
362 private boolean isSubHeaderStripped = false;
363 private READ_MODE readMode = READ_MODE.CONTINUOUS;
364 private long startingPos = 0L;
365
366 // Following state machine handles different states of compressed stream
367 // position
368 // HOLD : Don't advertise compressed stream position
369 // ADVERTISE : Read 1 more character and advertise stream position
370 // See more comments about it before updatePos method.
371 private enum POS_ADVERTISEMENT_STATE_MACHINE {
372 HOLD, ADVERTISE
373 };
374
375 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
376 long compressedStreamPosition = 0;
377
378 // class data ends here//
379
380 public BZip2CompressionInputStream(InputStream in) throws IOException {
381 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
382 }
383
384 public BZip2CompressionInputStream(InputStream in, long start, long end,
385 READ_MODE readMode) throws IOException {
386 super(in, start, end);
387 needsReset = false;
388 bufferedIn = new BufferedInputStream(super.in);
389 this.startingPos = super.getPos();
390 this.readMode = readMode;
391 if (this.startingPos == 0) {
392 // We only strip header if it is start of file
393 bufferedIn = readStreamHeader();
394 }
395 input = new CBZip2InputStream(bufferedIn, readMode);
396 if (this.isHeaderStripped) {
397 input.updateReportedByteCount(HEADER_LEN);
398 }
399
400 if (this.isSubHeaderStripped) {
401 input.updateReportedByteCount(SUB_HEADER_LEN);
402 }
403
404 this.updatePos(false);
405 }
406
407 private BufferedInputStream readStreamHeader() throws IOException {
408 // We are flexible enough to allow the compressed stream not to
409 // start with the header of BZ. So it works fine either we have
410 // the header or not.
411 if (super.in != null) {
412 bufferedIn.mark(HEADER_LEN);
413 byte[] headerBytes = new byte[HEADER_LEN];
414 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
415 if (actualRead != -1) {
416 String header = new String(headerBytes);
417 if (header.compareTo(HEADER) != 0) {
418 bufferedIn.reset();
419 } else {
420 this.isHeaderStripped = true;
421 // In case of BYBLOCK mode, we also want to strip off
422 // remaining two character of the header.
423 if (this.readMode == READ_MODE.BYBLOCK) {
424 actualRead = bufferedIn.read(headerBytes, 0,
425 SUB_HEADER_LEN);
426 if (actualRead != -1) {
427 this.isSubHeaderStripped = true;
428 }
429 }
430 }
431 }
432 }
433
434 if (bufferedIn == null) {
435 throw new IOException("Failed to read bzip2 stream.");
436 }
437
438 return bufferedIn;
439
440 }// end of method
441
442 public void close() throws IOException {
443 if (!needsReset) {
444 input.close();
445 needsReset = true;
446 }
447 }
448
449 /**
450 * This method updates compressed stream position exactly when the
451 * client of this code has read off at least one byte passed any BZip2
452 * end of block marker.
453 *
454 * This mechanism is very helpful to deal with data level record
455 * boundaries. Please see constructor and next methods of
456 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
457 * feature. We elaborate it with an example in the following:
458 *
459 * Assume two different scenarios of the BZip2 compressed stream, where
460 * [m] represent end of block, \n is line delimiter and . represent compressed
461 * data.
462 *
463 * ............[m]......\n.......
464 *
465 * ..........\n[m]......\n.......
466 *
467 * Assume that end is right after [m]. In the first case the reading
468 * will stop at \n and there is no need to read one more line. (To see the
469 * reason of reading one more line in the next() method is explained in LineRecordReader.)
470 * While in the second example LineRecordReader needs to read one more line
471 * (till the second \n). Now since BZip2Codecs only update position
472 * at least one byte passed a maker, so it is straight forward to differentiate
473 * between the two cases mentioned.
474 *
475 */
476
477 public int read(byte[] b, int off, int len) throws IOException {
478 if (needsReset) {
479 internalReset();
480 }
481
482 int result = 0;
483 result = this.input.read(b, off, len);
484 if (result == BZip2Constants.END_OF_BLOCK) {
485 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
486 }
487
488 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
489 result = this.input.read(b, off, off + 1);
490 // This is the precise time to update compressed stream position
491 // to the client of this code.
492 this.updatePos(true);
493 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
494 }
495
496 return result;
497
498 }
499
500 public int read() throws IOException {
501 byte b[] = new byte[1];
502 int result = this.read(b, 0, 1);
503 return (result < 0) ? result : (b[0] & 0xff);
504 }
505
506 private void internalReset() throws IOException {
507 if (needsReset) {
508 needsReset = false;
509 BufferedInputStream bufferedIn = readStreamHeader();
510 input = new CBZip2InputStream(bufferedIn, this.readMode);
511 }
512 }
513
514 public void resetState() throws IOException {
515 // Cannot read from bufferedIn at this point because bufferedIn
516 // might not be ready
517 // yet, as in SequenceFile.Reader implementation.
518 needsReset = true;
519 }
520
521 public long getPos() {
522 return this.compressedStreamPosition;
523 }
524
525 /*
526 * As the comments before read method tell that
527 * compressed stream is advertised when at least
528 * one byte passed EOB have been read off. But
529 * there is an exception to this rule. When we
530 * construct the stream we advertise the position
531 * exactly at EOB. In the following method
532 * shouldAddOn boolean captures this exception.
533 *
534 */
535 private void updatePos(boolean shouldAddOn) {
536 int addOn = shouldAddOn ? 1 : 0;
537 this.compressedStreamPosition = this.startingPos
538 + this.input.getProcessedByteCount() + addOn;
539 }
540
541 }// end of BZip2CompressionInputStream
542
543 }