001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.nio.charset.StandardCharsets; 026 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.fs.Seekable; 033import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 034import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 035import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 036import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 037 038import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; 039import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; 040 041/** 042 * This class provides output and input streams for bzip2 compression 043 * and decompression. It uses the native bzip2 library on the system 044 * if possible, else it uses a pure-Java implementation of the bzip2 045 * algorithm. The configuration parameter 046 * io.compression.codec.bzip2.library can be used to control this 047 * behavior. 048 * 049 * In the pure-Java mode, the Compressor and Decompressor interfaces 050 * are not implemented. Therefore, in that mode, those methods of 051 * CompressionCodec which have a Compressor or Decompressor type 052 * argument, throw UnsupportedOperationException. 053 * 054 * Currently, support for splittability is available only in the 055 * pure-Java mode; therefore, if a SplitCompressionInputStream is 056 * requested, the pure-Java implementation is used, regardless of the 057 * setting of the configuration parameter mentioned above. 058 */ 059@InterfaceAudience.Public 060@InterfaceStability.Evolving 061public class BZip2Codec implements Configurable, SplittableCompressionCodec { 062 063 private static final String HEADER = "BZ"; 064 private static final int HEADER_LEN = HEADER.length(); 065 private static final String SUB_HEADER = "h9"; 066 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 067 068 private Configuration conf; 069 070 /** 071 * Set the configuration to be used by this object. 072 * 073 * @param conf the configuration object. 074 */ 075 @Override 076 public void setConf(Configuration conf) { 077 this.conf = conf; 078 } 079 080 /** 081 * Return the configuration used by this object. 082 * 083 * @return the configuration object used by this objec. 084 */ 085 @Override 086 public Configuration getConf() { 087 return conf; 088 } 089 090 /** 091 * Creates a new instance of BZip2Codec. 092 */ 093 public BZip2Codec() { } 094 095 /** 096 * Create a {@link CompressionOutputStream} that will write to the given 097 * {@link OutputStream}. 098 * 099 * @param out the location for the final output stream 100 * @return a stream the user can write uncompressed data to, to have it 101 * compressed 102 * @throws IOException 103 */ 104 @Override 105 public CompressionOutputStream createOutputStream(OutputStream out) 106 throws IOException { 107 return CompressionCodec.Util. 108 createOutputStreamWithCodecPool(this, conf, out); 109 } 110 111 /** 112 * Create a {@link CompressionOutputStream} that will write to the given 113 * {@link OutputStream} with the given {@link Compressor}. 114 * 115 * @param out the location for the final output stream 116 * @param compressor compressor to use 117 * @return a stream the user can write uncompressed data to, to have it 118 * compressed 119 * @throws IOException 120 */ 121 @Override 122 public CompressionOutputStream createOutputStream(OutputStream out, 123 Compressor compressor) throws IOException { 124 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 125 new CompressorStream(out, compressor, 126 conf.getInt(IO_FILE_BUFFER_SIZE_KEY, 127 IO_FILE_BUFFER_SIZE_DEFAULT)) : 128 new BZip2CompressionOutputStream(out); 129 } 130 131 /** 132 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 133 * 134 * @return the type of compressor needed by this codec. 135 */ 136 @Override 137 public Class<? extends Compressor> getCompressorType() { 138 return Bzip2Factory.getBzip2CompressorType(conf); 139 } 140 141 /** 142 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 143 * 144 * @return a new compressor for use by this codec 145 */ 146 @Override 147 public Compressor createCompressor() { 148 return Bzip2Factory.getBzip2Compressor(conf); 149 } 150 151 /** 152 * Create a {@link CompressionInputStream} that will read from the given 153 * input stream and return a stream for uncompressed data. 154 * 155 * @param in the stream to read compressed bytes from 156 * @return a stream to read uncompressed bytes from 157 * @throws IOException 158 */ 159 @Override 160 public CompressionInputStream createInputStream(InputStream in) 161 throws IOException { 162 return CompressionCodec.Util. 163 createInputStreamWithCodecPool(this, conf, in); 164 } 165 166 /** 167 * Create a {@link CompressionInputStream} that will read from the given 168 * {@link InputStream} with the given {@link Decompressor}, and return a 169 * stream for uncompressed data. 170 * 171 * @param in the stream to read compressed bytes from 172 * @param decompressor decompressor to use 173 * @return a stream to read uncompressed bytes from 174 * @throws IOException 175 */ 176 @Override 177 public CompressionInputStream createInputStream(InputStream in, 178 Decompressor decompressor) throws IOException { 179 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 180 new DecompressorStream(in, decompressor, 181 conf.getInt(IO_FILE_BUFFER_SIZE_KEY, 182 IO_FILE_BUFFER_SIZE_DEFAULT)) : 183 new BZip2CompressionInputStream(in); 184 } 185 186 /** 187 * Creates CompressionInputStream to be used to read off uncompressed data 188 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 189 * 190 * @param seekableIn The InputStream 191 * @param start The start offset into the compressed stream 192 * @param end The end offset into the compressed stream 193 * @param readMode Controls whether progress is reported continuously or 194 * only at block boundaries. 195 * 196 * @return CompressionInputStream for BZip2 aligned at block boundaries 197 */ 198 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 199 Decompressor decompressor, long start, long end, READ_MODE readMode) 200 throws IOException { 201 202 if (!(seekableIn instanceof Seekable)) { 203 throw new IOException("seekableIn must be an instance of " + 204 Seekable.class.getName()); 205 } 206 207 //find the position of first BZip2 start up marker 208 ((Seekable)seekableIn).seek(0); 209 210 // BZip2 start of block markers are of 6 bytes. But the very first block 211 // also has "BZh9", making it 10 bytes. This is the common case. But at 212 // time stream might start without a leading BZ. 213 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 214 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 215 long adjStart = 0L; 216 if (start != 0) { 217 // Other than the first of file, the marker size is 6 bytes. 218 adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION 219 - (HEADER_LEN + SUB_HEADER_LEN))); 220 } 221 222 ((Seekable)seekableIn).seek(adjStart); 223 SplitCompressionInputStream in = 224 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 225 226 227 // The following if clause handles the following case: 228 // Assume the following scenario in BZip2 compressed stream where 229 // . represent compressed data. 230 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 231 // ........................[47 bits][1 bit].....[48 bit Block]... 232 // ................................^[Assume a Byte alignment here] 233 // ........................................^^[current position of stream] 234 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 235 // ........................................^^[We align at wrong position!] 236 // ...........................................................^^[While this pos is correct] 237 238 if (in.getPos() < start) { 239 ((Seekable)seekableIn).seek(start); 240 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 241 } 242 243 return in; 244 } 245 246 /** 247 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 248 * 249 * @return the type of decompressor needed by this codec. 250 */ 251 @Override 252 public Class<? extends Decompressor> getDecompressorType() { 253 return Bzip2Factory.getBzip2DecompressorType(conf); 254 } 255 256 /** 257 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 258 * 259 * @return a new decompressor for use by this codec 260 */ 261 @Override 262 public Decompressor createDecompressor() { 263 return Bzip2Factory.getBzip2Decompressor(conf); 264 } 265 266 /** 267 * .bz2 is recognized as the default extension for compressed BZip2 files 268 * 269 * @return A String telling the default bzip2 file extension 270 */ 271 @Override 272 public String getDefaultExtension() { 273 return ".bz2"; 274 } 275 276 private static class BZip2CompressionOutputStream extends 277 CompressionOutputStream { 278 279 // class data starts here// 280 private CBZip2OutputStream output; 281 private boolean needsReset; 282 // class data ends here// 283 284 public BZip2CompressionOutputStream(OutputStream out) 285 throws IOException { 286 super(out); 287 needsReset = true; 288 } 289 290 private void writeStreamHeader() throws IOException { 291 if (super.out != null) { 292 // The compressed bzip2 stream should start with the 293 // identifying characters BZ. Caller of CBZip2OutputStream 294 // i.e. this class must write these characters. 295 out.write(HEADER.getBytes(StandardCharsets.UTF_8)); 296 } 297 } 298 299 public void finish() throws IOException { 300 if (needsReset) { 301 // In the case that nothing is written to this stream, we still need to 302 // write out the header before closing, otherwise the stream won't be 303 // recognized by BZip2CompressionInputStream. 304 internalReset(); 305 } 306 this.output.finish(); 307 needsReset = true; 308 } 309 310 private void internalReset() throws IOException { 311 if (needsReset) { 312 needsReset = false; 313 writeStreamHeader(); 314 this.output = new CBZip2OutputStream(out); 315 } 316 } 317 318 public void resetState() throws IOException { 319 // Cannot write to out at this point because out might not be ready 320 // yet, as in SequenceFile.Writer implementation. 321 needsReset = true; 322 } 323 324 public void write(int b) throws IOException { 325 if (needsReset) { 326 internalReset(); 327 } 328 this.output.write(b); 329 } 330 331 public void write(byte[] b, int off, int len) throws IOException { 332 if (needsReset) { 333 internalReset(); 334 } 335 this.output.write(b, off, len); 336 } 337 338 public void close() throws IOException { 339 if (needsReset) { 340 // In the case that nothing is written to this stream, we still need to 341 // write out the header before closing, otherwise the stream won't be 342 // recognized by BZip2CompressionInputStream. 343 internalReset(); 344 } 345 this.output.flush(); 346 this.output.close(); 347 needsReset = true; 348 } 349 350 }// end of class BZip2CompressionOutputStream 351 352 /** 353 * This class is capable to de-compress BZip2 data in two modes; 354 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 355 * do decompression starting any arbitrary position in the stream. 356 * 357 * So this facility can easily be used to parallelize decompression 358 * of a large BZip2 file for performance reasons. (It is exactly 359 * done so for Hadoop framework. See LineRecordReader for an 360 * example). So one can break the file (of course logically) into 361 * chunks for parallel processing. These "splits" should be like 362 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 363 * So this code is designed and tested for FileInputFormat's way 364 * of splitting only. 365 */ 366 367 private static class BZip2CompressionInputStream extends 368 SplitCompressionInputStream { 369 370 // class data starts here// 371 private CBZip2InputStream input; 372 boolean needsReset; 373 private BufferedInputStream bufferedIn; 374 private boolean isHeaderStripped = false; 375 private boolean isSubHeaderStripped = false; 376 private READ_MODE readMode = READ_MODE.CONTINUOUS; 377 private long startingPos = 0L; 378 379 // Following state machine handles different states of compressed stream 380 // position 381 // HOLD : Don't advertise compressed stream position 382 // ADVERTISE : Read 1 more character and advertise stream position 383 // See more comments about it before updatePos method. 384 private enum POS_ADVERTISEMENT_STATE_MACHINE { 385 HOLD, ADVERTISE 386 }; 387 388 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 389 long compressedStreamPosition = 0; 390 391 // class data ends here// 392 393 public BZip2CompressionInputStream(InputStream in) throws IOException { 394 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 395 } 396 397 public BZip2CompressionInputStream(InputStream in, long start, long end, 398 READ_MODE readMode) throws IOException { 399 super(in, start, end); 400 needsReset = false; 401 bufferedIn = new BufferedInputStream(super.in); 402 this.startingPos = super.getPos(); 403 this.readMode = readMode; 404 if (this.startingPos == 0) { 405 // We only strip header if it is start of file 406 bufferedIn = readStreamHeader(); 407 } 408 input = new CBZip2InputStream(bufferedIn, readMode); 409 if (this.isHeaderStripped) { 410 input.updateReportedByteCount(HEADER_LEN); 411 } 412 413 if (this.isSubHeaderStripped) { 414 input.updateReportedByteCount(SUB_HEADER_LEN); 415 } 416 417 this.updatePos(false); 418 } 419 420 private BufferedInputStream readStreamHeader() throws IOException { 421 // We are flexible enough to allow the compressed stream not to 422 // start with the header of BZ. So it works fine either we have 423 // the header or not. 424 if (super.in != null) { 425 bufferedIn.mark(HEADER_LEN); 426 byte[] headerBytes = new byte[HEADER_LEN]; 427 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 428 if (actualRead != -1) { 429 String header = new String(headerBytes, StandardCharsets.UTF_8); 430 if (header.compareTo(HEADER) != 0) { 431 bufferedIn.reset(); 432 } else { 433 this.isHeaderStripped = true; 434 // In case of BYBLOCK mode, we also want to strip off 435 // remaining two character of the header. 436 if (this.readMode == READ_MODE.BYBLOCK) { 437 actualRead = bufferedIn.read(headerBytes, 0, 438 SUB_HEADER_LEN); 439 if (actualRead != -1) { 440 this.isSubHeaderStripped = true; 441 } 442 } 443 } 444 } 445 } 446 447 if (bufferedIn == null) { 448 throw new IOException("Failed to read bzip2 stream."); 449 } 450 451 return bufferedIn; 452 453 }// end of method 454 455 public void close() throws IOException { 456 if (!needsReset) { 457 input.close(); 458 needsReset = true; 459 } 460 } 461 462 /** 463 * This method updates compressed stream position exactly when the 464 * client of this code has read off at least one byte passed any BZip2 465 * end of block marker. 466 * 467 * This mechanism is very helpful to deal with data level record 468 * boundaries. Please see constructor and next methods of 469 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 470 * feature. We elaborate it with an example in the following: 471 * 472 * Assume two different scenarios of the BZip2 compressed stream, where 473 * [m] represent end of block, \n is line delimiter and . represent compressed 474 * data. 475 * 476 * ............[m]......\n....... 477 * 478 * ..........\n[m]......\n....... 479 * 480 * Assume that end is right after [m]. In the first case the reading 481 * will stop at \n and there is no need to read one more line. (To see the 482 * reason of reading one more line in the next() method is explained in LineRecordReader.) 483 * While in the second example LineRecordReader needs to read one more line 484 * (till the second \n). Now since BZip2Codecs only update position 485 * at least one byte passed a maker, so it is straight forward to differentiate 486 * between the two cases mentioned. 487 * 488 */ 489 490 public int read(byte[] b, int off, int len) throws IOException { 491 if (needsReset) { 492 internalReset(); 493 } 494 495 int result = 0; 496 result = this.input.read(b, off, len); 497 if (result == BZip2Constants.END_OF_BLOCK) { 498 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 499 } 500 501 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 502 result = this.input.read(b, off, off + 1); 503 // This is the precise time to update compressed stream position 504 // to the client of this code. 505 this.updatePos(true); 506 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 507 } 508 509 return result; 510 511 } 512 513 public int read() throws IOException { 514 byte b[] = new byte[1]; 515 int result = this.read(b, 0, 1); 516 return (result < 0) ? result : (b[0] & 0xff); 517 } 518 519 private void internalReset() throws IOException { 520 if (needsReset) { 521 needsReset = false; 522 BufferedInputStream bufferedIn = readStreamHeader(); 523 input = new CBZip2InputStream(bufferedIn, this.readMode); 524 } 525 } 526 527 public void resetState() throws IOException { 528 // Cannot read from bufferedIn at this point because bufferedIn 529 // might not be ready 530 // yet, as in SequenceFile.Reader implementation. 531 needsReset = true; 532 } 533 534 public long getPos() { 535 return this.compressedStreamPosition; 536 } 537 538 /* 539 * As the comments before read method tell that 540 * compressed stream is advertised when at least 541 * one byte passed EOB have been read off. But 542 * there is an exception to this rule. When we 543 * construct the stream we advertise the position 544 * exactly at EOB. In the following method 545 * shouldAddOn boolean captures this exception. 546 * 547 */ 548 private void updatePos(boolean shouldAddOn) { 549 int addOn = shouldAddOn ? 1 : 0; 550 this.compressedStreamPosition = this.startingPos 551 + this.input.getProcessedByteCount() + addOn; 552 } 553 554 }// end of BZip2CompressionInputStream 555 556}