001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.commons.io.Charsets; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.fs.Seekable; 033import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 034import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 035import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 036import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 037 038/** 039 * This class provides output and input streams for bzip2 compression 040 * and decompression. It uses the native bzip2 library on the system 041 * if possible, else it uses a pure-Java implementation of the bzip2 042 * algorithm. The configuration parameter 043 * io.compression.codec.bzip2.library can be used to control this 044 * behavior. 045 * 046 * In the pure-Java mode, the Compressor and Decompressor interfaces 047 * are not implemented. Therefore, in that mode, those methods of 048 * CompressionCodec which have a Compressor or Decompressor type 049 * argument, throw UnsupportedOperationException. 050 * 051 * Currently, support for splittability is available only in the 052 * pure-Java mode; therefore, if a SplitCompressionInputStream is 053 * requested, the pure-Java implementation is used, regardless of the 054 * setting of the configuration parameter mentioned above. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Evolving 058public class BZip2Codec implements Configurable, SplittableCompressionCodec { 059 060 private static final String HEADER = "BZ"; 061 private static final int HEADER_LEN = HEADER.length(); 062 private static final String SUB_HEADER = "h9"; 063 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 064 065 private Configuration conf; 066 067 /** 068 * Set the configuration to be used by this object. 069 * 070 * @param conf the configuration object. 071 */ 072 @Override 073 public void setConf(Configuration conf) { 074 this.conf = conf; 075 } 076 077 /** 078 * Return the configuration used by this object. 079 * 080 * @return the configuration object used by this objec. 081 */ 082 @Override 083 public Configuration getConf() { 084 return conf; 085 } 086 087 /** 088 * Creates a new instance of BZip2Codec. 089 */ 090 public BZip2Codec() { } 091 092 /** 093 * Create a {@link CompressionOutputStream} that will write to the given 094 * {@link OutputStream}. 095 * 096 * @param out the location for the final output stream 097 * @return a stream the user can write uncompressed data to, to have it 098 * compressed 099 * @throws IOException 100 */ 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out) 103 throws IOException { 104 return CompressionCodec.Util. 105 createOutputStreamWithCodecPool(this, conf, out); 106 } 107 108 /** 109 * Create a {@link CompressionOutputStream} that will write to the given 110 * {@link OutputStream} with the given {@link Compressor}. 111 * 112 * @param out the location for the final output stream 113 * @param compressor compressor to use 114 * @return a stream the user can write uncompressed data to, to have it 115 * compressed 116 * @throws IOException 117 */ 118 @Override 119 public CompressionOutputStream createOutputStream(OutputStream out, 120 Compressor compressor) throws IOException { 121 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 122 new CompressorStream(out, compressor, 123 conf.getInt("io.file.buffer.size", 4*1024)) : 124 new BZip2CompressionOutputStream(out); 125 } 126 127 /** 128 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 129 * 130 * @return the type of compressor needed by this codec. 131 */ 132 @Override 133 public Class<? extends Compressor> getCompressorType() { 134 return Bzip2Factory.getBzip2CompressorType(conf); 135 } 136 137 /** 138 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 139 * 140 * @return a new compressor for use by this codec 141 */ 142 @Override 143 public Compressor createCompressor() { 144 return Bzip2Factory.getBzip2Compressor(conf); 145 } 146 147 /** 148 * Create a {@link CompressionInputStream} that will read from the given 149 * input stream and return a stream for uncompressed data. 150 * 151 * @param in the stream to read compressed bytes from 152 * @return a stream to read uncompressed bytes from 153 * @throws IOException 154 */ 155 @Override 156 public CompressionInputStream createInputStream(InputStream in) 157 throws IOException { 158 return CompressionCodec.Util. 159 createInputStreamWithCodecPool(this, conf, in); 160 } 161 162 /** 163 * Create a {@link CompressionInputStream} that will read from the given 164 * {@link InputStream} with the given {@link Decompressor}, and return a 165 * stream for uncompressed data. 166 * 167 * @param in the stream to read compressed bytes from 168 * @param decompressor decompressor to use 169 * @return a stream to read uncompressed bytes from 170 * @throws IOException 171 */ 172 @Override 173 public CompressionInputStream createInputStream(InputStream in, 174 Decompressor decompressor) throws IOException { 175 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 176 new DecompressorStream(in, decompressor, 177 conf.getInt("io.file.buffer.size", 4*1024)) : 178 new BZip2CompressionInputStream(in); 179 } 180 181 /** 182 * Creates CompressionInputStream to be used to read off uncompressed data 183 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 184 * 185 * @param seekableIn The InputStream 186 * @param start The start offset into the compressed stream 187 * @param end The end offset into the compressed stream 188 * @param readMode Controls whether progress is reported continuously or 189 * only at block boundaries. 190 * 191 * @return CompressionInputStream for BZip2 aligned at block boundaries 192 */ 193 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 194 Decompressor decompressor, long start, long end, READ_MODE readMode) 195 throws IOException { 196 197 if (!(seekableIn instanceof Seekable)) { 198 throw new IOException("seekableIn must be an instance of " + 199 Seekable.class.getName()); 200 } 201 202 //find the position of first BZip2 start up marker 203 ((Seekable)seekableIn).seek(0); 204 205 // BZip2 start of block markers are of 6 bytes. But the very first block 206 // also has "BZh9", making it 10 bytes. This is the common case. But at 207 // time stream might start without a leading BZ. 208 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 209 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 210 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); 211 212 ((Seekable)seekableIn).seek(adjStart); 213 SplitCompressionInputStream in = 214 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 215 216 217 // The following if clause handles the following case: 218 // Assume the following scenario in BZip2 compressed stream where 219 // . represent compressed data. 220 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 221 // ........................[47 bits][1 bit].....[48 bit Block]... 222 // ................................^[Assume a Byte alignment here] 223 // ........................................^^[current position of stream] 224 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 225 // ........................................^^[We align at wrong position!] 226 // ...........................................................^^[While this pos is correct] 227 228 if (in.getPos() < start) { 229 ((Seekable)seekableIn).seek(start); 230 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 231 } 232 233 return in; 234 } 235 236 /** 237 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 238 * 239 * @return the type of decompressor needed by this codec. 240 */ 241 @Override 242 public Class<? extends Decompressor> getDecompressorType() { 243 return Bzip2Factory.getBzip2DecompressorType(conf); 244 } 245 246 /** 247 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 248 * 249 * @return a new decompressor for use by this codec 250 */ 251 @Override 252 public Decompressor createDecompressor() { 253 return Bzip2Factory.getBzip2Decompressor(conf); 254 } 255 256 /** 257 * .bz2 is recognized as the default extension for compressed BZip2 files 258 * 259 * @return A String telling the default bzip2 file extension 260 */ 261 @Override 262 public String getDefaultExtension() { 263 return ".bz2"; 264 } 265 266 private static class BZip2CompressionOutputStream extends 267 CompressionOutputStream { 268 269 // class data starts here// 270 private CBZip2OutputStream output; 271 private boolean needsReset; 272 // class data ends here// 273 274 public BZip2CompressionOutputStream(OutputStream out) 275 throws IOException { 276 super(out); 277 needsReset = true; 278 } 279 280 private void writeStreamHeader() throws IOException { 281 if (super.out != null) { 282 // The compressed bzip2 stream should start with the 283 // identifying characters BZ. Caller of CBZip2OutputStream 284 // i.e. this class must write these characters. 285 out.write(HEADER.getBytes(Charsets.UTF_8)); 286 } 287 } 288 289 public void finish() throws IOException { 290 if (needsReset) { 291 // In the case that nothing is written to this stream, we still need to 292 // write out the header before closing, otherwise the stream won't be 293 // recognized by BZip2CompressionInputStream. 294 internalReset(); 295 } 296 this.output.finish(); 297 needsReset = true; 298 } 299 300 private void internalReset() throws IOException { 301 if (needsReset) { 302 needsReset = false; 303 writeStreamHeader(); 304 this.output = new CBZip2OutputStream(out); 305 } 306 } 307 308 public void resetState() throws IOException { 309 // Cannot write to out at this point because out might not be ready 310 // yet, as in SequenceFile.Writer implementation. 311 needsReset = true; 312 } 313 314 public void write(int b) throws IOException { 315 if (needsReset) { 316 internalReset(); 317 } 318 this.output.write(b); 319 } 320 321 public void write(byte[] b, int off, int len) throws IOException { 322 if (needsReset) { 323 internalReset(); 324 } 325 this.output.write(b, off, len); 326 } 327 328 public void close() throws IOException { 329 if (needsReset) { 330 // In the case that nothing is written to this stream, we still need to 331 // write out the header before closing, otherwise the stream won't be 332 // recognized by BZip2CompressionInputStream. 333 internalReset(); 334 } 335 this.output.flush(); 336 this.output.close(); 337 needsReset = true; 338 } 339 340 }// end of class BZip2CompressionOutputStream 341 342 /** 343 * This class is capable to de-compress BZip2 data in two modes; 344 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 345 * do decompression starting any arbitrary position in the stream. 346 * 347 * So this facility can easily be used to parallelize decompression 348 * of a large BZip2 file for performance reasons. (It is exactly 349 * done so for Hadoop framework. See LineRecordReader for an 350 * example). So one can break the file (of course logically) into 351 * chunks for parallel processing. These "splits" should be like 352 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 353 * So this code is designed and tested for FileInputFormat's way 354 * of splitting only. 355 */ 356 357 private static class BZip2CompressionInputStream extends 358 SplitCompressionInputStream { 359 360 // class data starts here// 361 private CBZip2InputStream input; 362 boolean needsReset; 363 private BufferedInputStream bufferedIn; 364 private boolean isHeaderStripped = false; 365 private boolean isSubHeaderStripped = false; 366 private READ_MODE readMode = READ_MODE.CONTINUOUS; 367 private long startingPos = 0L; 368 369 // Following state machine handles different states of compressed stream 370 // position 371 // HOLD : Don't advertise compressed stream position 372 // ADVERTISE : Read 1 more character and advertise stream position 373 // See more comments about it before updatePos method. 374 private enum POS_ADVERTISEMENT_STATE_MACHINE { 375 HOLD, ADVERTISE 376 }; 377 378 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 379 long compressedStreamPosition = 0; 380 381 // class data ends here// 382 383 public BZip2CompressionInputStream(InputStream in) throws IOException { 384 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 385 } 386 387 public BZip2CompressionInputStream(InputStream in, long start, long end, 388 READ_MODE readMode) throws IOException { 389 super(in, start, end); 390 needsReset = false; 391 bufferedIn = new BufferedInputStream(super.in); 392 this.startingPos = super.getPos(); 393 this.readMode = readMode; 394 if (this.startingPos == 0) { 395 // We only strip header if it is start of file 396 bufferedIn = readStreamHeader(); 397 } 398 input = new CBZip2InputStream(bufferedIn, readMode); 399 if (this.isHeaderStripped) { 400 input.updateReportedByteCount(HEADER_LEN); 401 } 402 403 if (this.isSubHeaderStripped) { 404 input.updateReportedByteCount(SUB_HEADER_LEN); 405 } 406 407 this.updatePos(false); 408 } 409 410 private BufferedInputStream readStreamHeader() throws IOException { 411 // We are flexible enough to allow the compressed stream not to 412 // start with the header of BZ. So it works fine either we have 413 // the header or not. 414 if (super.in != null) { 415 bufferedIn.mark(HEADER_LEN); 416 byte[] headerBytes = new byte[HEADER_LEN]; 417 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 418 if (actualRead != -1) { 419 String header = new String(headerBytes, Charsets.UTF_8); 420 if (header.compareTo(HEADER) != 0) { 421 bufferedIn.reset(); 422 } else { 423 this.isHeaderStripped = true; 424 // In case of BYBLOCK mode, we also want to strip off 425 // remaining two character of the header. 426 if (this.readMode == READ_MODE.BYBLOCK) { 427 actualRead = bufferedIn.read(headerBytes, 0, 428 SUB_HEADER_LEN); 429 if (actualRead != -1) { 430 this.isSubHeaderStripped = true; 431 } 432 } 433 } 434 } 435 } 436 437 if (bufferedIn == null) { 438 throw new IOException("Failed to read bzip2 stream."); 439 } 440 441 return bufferedIn; 442 443 }// end of method 444 445 public void close() throws IOException { 446 if (!needsReset) { 447 input.close(); 448 needsReset = true; 449 } 450 } 451 452 /** 453 * This method updates compressed stream position exactly when the 454 * client of this code has read off at least one byte passed any BZip2 455 * end of block marker. 456 * 457 * This mechanism is very helpful to deal with data level record 458 * boundaries. Please see constructor and next methods of 459 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 460 * feature. We elaborate it with an example in the following: 461 * 462 * Assume two different scenarios of the BZip2 compressed stream, where 463 * [m] represent end of block, \n is line delimiter and . represent compressed 464 * data. 465 * 466 * ............[m]......\n....... 467 * 468 * ..........\n[m]......\n....... 469 * 470 * Assume that end is right after [m]. In the first case the reading 471 * will stop at \n and there is no need to read one more line. (To see the 472 * reason of reading one more line in the next() method is explained in LineRecordReader.) 473 * While in the second example LineRecordReader needs to read one more line 474 * (till the second \n). Now since BZip2Codecs only update position 475 * at least one byte passed a maker, so it is straight forward to differentiate 476 * between the two cases mentioned. 477 * 478 */ 479 480 public int read(byte[] b, int off, int len) throws IOException { 481 if (needsReset) { 482 internalReset(); 483 } 484 485 int result = 0; 486 result = this.input.read(b, off, len); 487 if (result == BZip2Constants.END_OF_BLOCK) { 488 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 489 } 490 491 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 492 result = this.input.read(b, off, off + 1); 493 // This is the precise time to update compressed stream position 494 // to the client of this code. 495 this.updatePos(true); 496 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 497 } 498 499 return result; 500 501 } 502 503 public int read() throws IOException { 504 byte b[] = new byte[1]; 505 int result = this.read(b, 0, 1); 506 return (result < 0) ? result : (b[0] & 0xff); 507 } 508 509 private void internalReset() throws IOException { 510 if (needsReset) { 511 needsReset = false; 512 BufferedInputStream bufferedIn = readStreamHeader(); 513 input = new CBZip2InputStream(bufferedIn, this.readMode); 514 } 515 } 516 517 public void resetState() throws IOException { 518 // Cannot read from bufferedIn at this point because bufferedIn 519 // might not be ready 520 // yet, as in SequenceFile.Reader implementation. 521 needsReset = true; 522 } 523 524 public long getPos() { 525 return this.compressedStreamPosition; 526 } 527 528 /* 529 * As the comments before read method tell that 530 * compressed stream is advertised when at least 531 * one byte passed EOB have been read off. But 532 * there is an exception to this rule. When we 533 * construct the stream we advertise the position 534 * exactly at EOB. In the following method 535 * shouldAddOn boolean captures this exception. 536 * 537 */ 538 private void updatePos(boolean shouldAddOn) { 539 int addOn = shouldAddOn ? 1 : 0; 540 this.compressedStreamPosition = this.startingPos 541 + this.input.getProcessedByteCount() + addOn; 542 } 543 544 }// end of BZip2CompressionInputStream 545 546}