001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.BufferedInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025
026 import org.apache.hadoop.conf.Configurable;
027 import org.apache.hadoop.conf.Configuration;
028
029 import org.apache.hadoop.classification.InterfaceAudience;
030 import org.apache.hadoop.classification.InterfaceStability;
031 import org.apache.hadoop.fs.Seekable;
032 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
036
037 /**
038 * This class provides output and input streams for bzip2 compression
039 * and decompression. It uses the native bzip2 library on the system
040 * if possible, else it uses a pure-Java implementation of the bzip2
041 * algorithm. The configuration parameter
042 * io.compression.codec.bzip2.library can be used to control this
043 * behavior.
044 *
045 * In the pure-Java mode, the Compressor and Decompressor interfaces
046 * are not implemented. Therefore, in that mode, those methods of
047 * CompressionCodec which have a Compressor or Decompressor type
048 * argument, throw UnsupportedOperationException.
049 *
050 * Currently, support for splittability is available only in the
051 * pure-Java mode; therefore, if a SplitCompressionInputStream is
052 * requested, the pure-Java implementation is used, regardless of the
053 * setting of the configuration parameter mentioned above.
054 */
055 @InterfaceAudience.Public
056 @InterfaceStability.Evolving
057 public class BZip2Codec implements Configurable, SplittableCompressionCodec {
058
059 private static final String HEADER = "BZ";
060 private static final int HEADER_LEN = HEADER.length();
061 private static final String SUB_HEADER = "h9";
062 private static final int SUB_HEADER_LEN = SUB_HEADER.length();
063
064 private Configuration conf;
065
066 /**
067 * Set the configuration to be used by this object.
068 *
069 * @param conf the configuration object.
070 */
071 @Override
072 public void setConf(Configuration conf) {
073 this.conf = conf;
074 }
075
076 /**
077 * Return the configuration used by this object.
078 *
079 * @return the configuration object used by this objec.
080 */
081 @Override
082 public Configuration getConf() {
083 return conf;
084 }
085
086 /**
087 * Creates a new instance of BZip2Codec.
088 */
089 public BZip2Codec() { }
090
091 /**
092 * Create a {@link CompressionOutputStream} that will write to the given
093 * {@link OutputStream}.
094 *
095 * @param out the location for the final output stream
096 * @return a stream the user can write uncompressed data to, to have it
097 * compressed
098 * @throws IOException
099 */
100 @Override
101 public CompressionOutputStream createOutputStream(OutputStream out)
102 throws IOException {
103 return CompressionCodec.Util.
104 createOutputStreamWithCodecPool(this, conf, out);
105 }
106
107 /**
108 * Create a {@link CompressionOutputStream} that will write to the given
109 * {@link OutputStream} with the given {@link Compressor}.
110 *
111 * @param out the location for the final output stream
112 * @param compressor compressor to use
113 * @return a stream the user can write uncompressed data to, to have it
114 * compressed
115 * @throws IOException
116 */
117 @Override
118 public CompressionOutputStream createOutputStream(OutputStream out,
119 Compressor compressor) throws IOException {
120 return Bzip2Factory.isNativeBzip2Loaded(conf) ?
121 new CompressorStream(out, compressor,
122 conf.getInt("io.file.buffer.size", 4*1024)) :
123 new BZip2CompressionOutputStream(out);
124 }
125
126 /**
127 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
128 *
129 * @return the type of compressor needed by this codec.
130 */
131 @Override
132 public Class<? extends Compressor> getCompressorType() {
133 return Bzip2Factory.getBzip2CompressorType(conf);
134 }
135
136 /**
137 * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
138 *
139 * @return a new compressor for use by this codec
140 */
141 @Override
142 public Compressor createCompressor() {
143 return Bzip2Factory.getBzip2Compressor(conf);
144 }
145
146 /**
147 * Create a {@link CompressionInputStream} that will read from the given
148 * input stream and return a stream for uncompressed data.
149 *
150 * @param in the stream to read compressed bytes from
151 * @return a stream to read uncompressed bytes from
152 * @throws IOException
153 */
154 @Override
155 public CompressionInputStream createInputStream(InputStream in)
156 throws IOException {
157 return CompressionCodec.Util.
158 createInputStreamWithCodecPool(this, conf, in);
159 }
160
161 /**
162 * Create a {@link CompressionInputStream} that will read from the given
163 * {@link InputStream} with the given {@link Decompressor}, and return a
164 * stream for uncompressed data.
165 *
166 * @param in the stream to read compressed bytes from
167 * @param decompressor decompressor to use
168 * @return a stream to read uncompressed bytes from
169 * @throws IOException
170 */
171 @Override
172 public CompressionInputStream createInputStream(InputStream in,
173 Decompressor decompressor) throws IOException {
174 return Bzip2Factory.isNativeBzip2Loaded(conf) ?
175 new DecompressorStream(in, decompressor,
176 conf.getInt("io.file.buffer.size", 4*1024)) :
177 new BZip2CompressionInputStream(in);
178 }
179
180 /**
181 * Creates CompressionInputStream to be used to read off uncompressed data
182 * in one of the two reading modes. i.e. Continuous or Blocked reading modes
183 *
184 * @param seekableIn The InputStream
185 * @param start The start offset into the compressed stream
186 * @param end The end offset into the compressed stream
187 * @param readMode Controls whether progress is reported continuously or
188 * only at block boundaries.
189 *
190 * @return CompressionInputStream for BZip2 aligned at block boundaries
191 */
192 public SplitCompressionInputStream createInputStream(InputStream seekableIn,
193 Decompressor decompressor, long start, long end, READ_MODE readMode)
194 throws IOException {
195
196 if (!(seekableIn instanceof Seekable)) {
197 throw new IOException("seekableIn must be an instance of " +
198 Seekable.class.getName());
199 }
200
201 //find the position of first BZip2 start up marker
202 ((Seekable)seekableIn).seek(0);
203
204 // BZip2 start of block markers are of 6 bytes. But the very first block
205 // also has "BZh9", making it 10 bytes. This is the common case. But at
206 // time stream might start without a leading BZ.
207 final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
208 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
209 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
210
211 ((Seekable)seekableIn).seek(adjStart);
212 SplitCompressionInputStream in =
213 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
214
215
216 // The following if clause handles the following case:
217 // Assume the following scenario in BZip2 compressed stream where
218 // . represent compressed data.
219 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
220 // ........................[47 bits][1 bit].....[48 bit Block]...
221 // ................................^[Assume a Byte alignment here]
222 // ........................................^^[current position of stream]
223 // .....................^^[We go back 10 Bytes in stream and find a Block marker]
224 // ........................................^^[We align at wrong position!]
225 // ...........................................................^^[While this pos is correct]
226
227 if (in.getPos() <= start) {
228 ((Seekable)seekableIn).seek(start);
229 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
230 }
231
232 return in;
233 }
234
235 /**
236 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
237 *
238 * @return the type of decompressor needed by this codec.
239 */
240 @Override
241 public Class<? extends Decompressor> getDecompressorType() {
242 return Bzip2Factory.getBzip2DecompressorType(conf);
243 }
244
245 /**
246 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
247 *
248 * @return a new decompressor for use by this codec
249 */
250 @Override
251 public Decompressor createDecompressor() {
252 return Bzip2Factory.getBzip2Decompressor(conf);
253 }
254
255 /**
256 * .bz2 is recognized as the default extension for compressed BZip2 files
257 *
258 * @return A String telling the default bzip2 file extension
259 */
260 @Override
261 public String getDefaultExtension() {
262 return ".bz2";
263 }
264
265 private static class BZip2CompressionOutputStream extends
266 CompressionOutputStream {
267
268 // class data starts here//
269 private CBZip2OutputStream output;
270 private boolean needsReset;
271 // class data ends here//
272
273 public BZip2CompressionOutputStream(OutputStream out)
274 throws IOException {
275 super(out);
276 needsReset = true;
277 }
278
279 private void writeStreamHeader() throws IOException {
280 if (super.out != null) {
281 // The compressed bzip2 stream should start with the
282 // identifying characters BZ. Caller of CBZip2OutputStream
283 // i.e. this class must write these characters.
284 out.write(HEADER.getBytes());
285 }
286 }
287
288 public void finish() throws IOException {
289 if (needsReset) {
290 // In the case that nothing is written to this stream, we still need to
291 // write out the header before closing, otherwise the stream won't be
292 // recognized by BZip2CompressionInputStream.
293 internalReset();
294 }
295 this.output.finish();
296 needsReset = true;
297 }
298
299 private void internalReset() throws IOException {
300 if (needsReset) {
301 needsReset = false;
302 writeStreamHeader();
303 this.output = new CBZip2OutputStream(out);
304 }
305 }
306
307 public void resetState() throws IOException {
308 // Cannot write to out at this point because out might not be ready
309 // yet, as in SequenceFile.Writer implementation.
310 needsReset = true;
311 }
312
313 public void write(int b) throws IOException {
314 if (needsReset) {
315 internalReset();
316 }
317 this.output.write(b);
318 }
319
320 public void write(byte[] b, int off, int len) throws IOException {
321 if (needsReset) {
322 internalReset();
323 }
324 this.output.write(b, off, len);
325 }
326
327 public void close() throws IOException {
328 if (needsReset) {
329 // In the case that nothing is written to this stream, we still need to
330 // write out the header before closing, otherwise the stream won't be
331 // recognized by BZip2CompressionInputStream.
332 internalReset();
333 }
334 this.output.flush();
335 this.output.close();
336 needsReset = true;
337 }
338
339 }// end of class BZip2CompressionOutputStream
340
341 /**
342 * This class is capable to de-compress BZip2 data in two modes;
343 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to
344 * do decompression starting any arbitrary position in the stream.
345 *
346 * So this facility can easily be used to parallelize decompression
347 * of a large BZip2 file for performance reasons. (It is exactly
348 * done so for Hadoop framework. See LineRecordReader for an
349 * example). So one can break the file (of course logically) into
350 * chunks for parallel processing. These "splits" should be like
351 * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
352 * So this code is designed and tested for FileInputFormat's way
353 * of splitting only.
354 */
355
356 private static class BZip2CompressionInputStream extends
357 SplitCompressionInputStream {
358
359 // class data starts here//
360 private CBZip2InputStream input;
361 boolean needsReset;
362 private BufferedInputStream bufferedIn;
363 private boolean isHeaderStripped = false;
364 private boolean isSubHeaderStripped = false;
365 private READ_MODE readMode = READ_MODE.CONTINUOUS;
366 private long startingPos = 0L;
367
368 // Following state machine handles different states of compressed stream
369 // position
370 // HOLD : Don't advertise compressed stream position
371 // ADVERTISE : Read 1 more character and advertise stream position
372 // See more comments about it before updatePos method.
373 private enum POS_ADVERTISEMENT_STATE_MACHINE {
374 HOLD, ADVERTISE
375 };
376
377 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
378 long compressedStreamPosition = 0;
379
380 // class data ends here//
381
382 public BZip2CompressionInputStream(InputStream in) throws IOException {
383 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
384 }
385
386 public BZip2CompressionInputStream(InputStream in, long start, long end,
387 READ_MODE readMode) throws IOException {
388 super(in, start, end);
389 needsReset = false;
390 bufferedIn = new BufferedInputStream(super.in);
391 this.startingPos = super.getPos();
392 this.readMode = readMode;
393 if (this.startingPos == 0) {
394 // We only strip header if it is start of file
395 bufferedIn = readStreamHeader();
396 }
397 input = new CBZip2InputStream(bufferedIn, readMode);
398 if (this.isHeaderStripped) {
399 input.updateReportedByteCount(HEADER_LEN);
400 }
401
402 if (this.isSubHeaderStripped) {
403 input.updateReportedByteCount(SUB_HEADER_LEN);
404 }
405
406 this.updatePos(false);
407 }
408
409 private BufferedInputStream readStreamHeader() throws IOException {
410 // We are flexible enough to allow the compressed stream not to
411 // start with the header of BZ. So it works fine either we have
412 // the header or not.
413 if (super.in != null) {
414 bufferedIn.mark(HEADER_LEN);
415 byte[] headerBytes = new byte[HEADER_LEN];
416 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
417 if (actualRead != -1) {
418 String header = new String(headerBytes);
419 if (header.compareTo(HEADER) != 0) {
420 bufferedIn.reset();
421 } else {
422 this.isHeaderStripped = true;
423 // In case of BYBLOCK mode, we also want to strip off
424 // remaining two character of the header.
425 if (this.readMode == READ_MODE.BYBLOCK) {
426 actualRead = bufferedIn.read(headerBytes, 0,
427 SUB_HEADER_LEN);
428 if (actualRead != -1) {
429 this.isSubHeaderStripped = true;
430 }
431 }
432 }
433 }
434 }
435
436 if (bufferedIn == null) {
437 throw new IOException("Failed to read bzip2 stream.");
438 }
439
440 return bufferedIn;
441
442 }// end of method
443
444 public void close() throws IOException {
445 if (!needsReset) {
446 input.close();
447 needsReset = true;
448 }
449 }
450
451 /**
452 * This method updates compressed stream position exactly when the
453 * client of this code has read off at least one byte passed any BZip2
454 * end of block marker.
455 *
456 * This mechanism is very helpful to deal with data level record
457 * boundaries. Please see constructor and next methods of
458 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
459 * feature. We elaborate it with an example in the following:
460 *
461 * Assume two different scenarios of the BZip2 compressed stream, where
462 * [m] represent end of block, \n is line delimiter and . represent compressed
463 * data.
464 *
465 * ............[m]......\n.......
466 *
467 * ..........\n[m]......\n.......
468 *
469 * Assume that end is right after [m]. In the first case the reading
470 * will stop at \n and there is no need to read one more line. (To see the
471 * reason of reading one more line in the next() method is explained in LineRecordReader.)
472 * While in the second example LineRecordReader needs to read one more line
473 * (till the second \n). Now since BZip2Codecs only update position
474 * at least one byte passed a maker, so it is straight forward to differentiate
475 * between the two cases mentioned.
476 *
477 */
478
479 public int read(byte[] b, int off, int len) throws IOException {
480 if (needsReset) {
481 internalReset();
482 }
483
484 int result = 0;
485 result = this.input.read(b, off, len);
486 if (result == BZip2Constants.END_OF_BLOCK) {
487 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
488 }
489
490 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
491 result = this.input.read(b, off, off + 1);
492 // This is the precise time to update compressed stream position
493 // to the client of this code.
494 this.updatePos(true);
495 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
496 }
497
498 return result;
499
500 }
501
502 public int read() throws IOException {
503 byte b[] = new byte[1];
504 int result = this.read(b, 0, 1);
505 return (result < 0) ? result : (b[0] & 0xff);
506 }
507
508 private void internalReset() throws IOException {
509 if (needsReset) {
510 needsReset = false;
511 BufferedInputStream bufferedIn = readStreamHeader();
512 input = new CBZip2InputStream(bufferedIn, this.readMode);
513 }
514 }
515
516 public void resetState() throws IOException {
517 // Cannot read from bufferedIn at this point because bufferedIn
518 // might not be ready
519 // yet, as in SequenceFile.Reader implementation.
520 needsReset = true;
521 }
522
523 public long getPos() {
524 return this.compressedStreamPosition;
525 }
526
527 /*
528 * As the comments before read method tell that
529 * compressed stream is advertised when at least
530 * one byte passed EOB have been read off. But
531 * there is an exception to this rule. When we
532 * construct the stream we advertise the position
533 * exactly at EOB. In the following method
534 * shouldAddOn boolean captures this exception.
535 *
536 */
537 private void updatePos(boolean shouldAddOn) {
538 int addOn = shouldAddOn ? 1 : 0;
539 this.compressedStreamPosition = this.startingPos
540 + this.input.getProcessedByteCount() + addOn;
541 }
542
543 }// end of BZip2CompressionInputStream
544
545 }