001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.commons.io.Charsets; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.fs.Seekable; 033import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 034import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 035import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 036import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 037 038/** 039 * This class provides output and input streams for bzip2 compression 040 * and decompression. It uses the native bzip2 library on the system 041 * if possible, else it uses a pure-Java implementation of the bzip2 042 * algorithm. The configuration parameter 043 * io.compression.codec.bzip2.library can be used to control this 044 * behavior. 045 * 046 * In the pure-Java mode, the Compressor and Decompressor interfaces 047 * are not implemented. Therefore, in that mode, those methods of 048 * CompressionCodec which have a Compressor or Decompressor type 049 * argument, throw UnsupportedOperationException. 050 * 051 * Currently, support for splittability is available only in the 052 * pure-Java mode; therefore, if a SplitCompressionInputStream is 053 * requested, the pure-Java implementation is used, regardless of the 054 * setting of the configuration parameter mentioned above. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Evolving 058public class BZip2Codec implements Configurable, SplittableCompressionCodec { 059 060 private static final String HEADER = "BZ"; 061 private static final int HEADER_LEN = HEADER.length(); 062 private static final String SUB_HEADER = "h9"; 063 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 064 065 private Configuration conf; 066 067 /** 068 * Set the configuration to be used by this object. 069 * 070 * @param conf the configuration object. 071 */ 072 @Override 073 public void setConf(Configuration conf) { 074 this.conf = conf; 075 } 076 077 /** 078 * Return the configuration used by this object. 079 * 080 * @return the configuration object used by this objec. 081 */ 082 @Override 083 public Configuration getConf() { 084 return conf; 085 } 086 087 /** 088 * Creates a new instance of BZip2Codec. 089 */ 090 public BZip2Codec() { } 091 092 /** 093 * Create a {@link CompressionOutputStream} that will write to the given 094 * {@link OutputStream}. 095 * 096 * @param out the location for the final output stream 097 * @return a stream the user can write uncompressed data to, to have it 098 * compressed 099 * @throws IOException 100 */ 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out) 103 throws IOException { 104 return CompressionCodec.Util. 105 createOutputStreamWithCodecPool(this, conf, out); 106 } 107 108 /** 109 * Create a {@link CompressionOutputStream} that will write to the given 110 * {@link OutputStream} with the given {@link Compressor}. 111 * 112 * @param out the location for the final output stream 113 * @param compressor compressor to use 114 * @return a stream the user can write uncompressed data to, to have it 115 * compressed 116 * @throws IOException 117 */ 118 @Override 119 public CompressionOutputStream createOutputStream(OutputStream out, 120 Compressor compressor) throws IOException { 121 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 122 new CompressorStream(out, compressor, 123 conf.getInt("io.file.buffer.size", 4*1024)) : 124 new BZip2CompressionOutputStream(out); 125 } 126 127 /** 128 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 129 * 130 * @return the type of compressor needed by this codec. 131 */ 132 @Override 133 public Class<? extends Compressor> getCompressorType() { 134 return Bzip2Factory.getBzip2CompressorType(conf); 135 } 136 137 /** 138 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 139 * 140 * @return a new compressor for use by this codec 141 */ 142 @Override 143 public Compressor createCompressor() { 144 return Bzip2Factory.getBzip2Compressor(conf); 145 } 146 147 /** 148 * Create a {@link CompressionInputStream} that will read from the given 149 * input stream and return a stream for uncompressed data. 150 * 151 * @param in the stream to read compressed bytes from 152 * @return a stream to read uncompressed bytes from 153 * @throws IOException 154 */ 155 @Override 156 public CompressionInputStream createInputStream(InputStream in) 157 throws IOException { 158 return CompressionCodec.Util. 159 createInputStreamWithCodecPool(this, conf, in); 160 } 161 162 /** 163 * Create a {@link CompressionInputStream} that will read from the given 164 * {@link InputStream} with the given {@link Decompressor}, and return a 165 * stream for uncompressed data. 166 * 167 * @param in the stream to read compressed bytes from 168 * @param decompressor decompressor to use 169 * @return a stream to read uncompressed bytes from 170 * @throws IOException 171 */ 172 @Override 173 public CompressionInputStream createInputStream(InputStream in, 174 Decompressor decompressor) throws IOException { 175 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 176 new DecompressorStream(in, decompressor, 177 conf.getInt("io.file.buffer.size", 4*1024)) : 178 new BZip2CompressionInputStream(in); 179 } 180 181 /** 182 * Creates CompressionInputStream to be used to read off uncompressed data 183 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 184 * 185 * @param seekableIn The InputStream 186 * @param start The start offset into the compressed stream 187 * @param end The end offset into the compressed stream 188 * @param readMode Controls whether progress is reported continuously or 189 * only at block boundaries. 190 * 191 * @return CompressionInputStream for BZip2 aligned at block boundaries 192 */ 193 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 194 Decompressor decompressor, long start, long end, READ_MODE readMode) 195 throws IOException { 196 197 if (!(seekableIn instanceof Seekable)) { 198 throw new IOException("seekableIn must be an instance of " + 199 Seekable.class.getName()); 200 } 201 202 //find the position of first BZip2 start up marker 203 ((Seekable)seekableIn).seek(0); 204 205 // BZip2 start of block markers are of 6 bytes. But the very first block 206 // also has "BZh9", making it 10 bytes. This is the common case. But at 207 // time stream might start without a leading BZ. 208 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 209 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 210 long adjStart = 0L; 211 if (start != 0) { 212 // Other than the first of file, the marker size is 6 bytes. 213 adjStart = Math.max(0L, start - (FIRST_BZIP2_BLOCK_MARKER_POSITION 214 - (HEADER_LEN + SUB_HEADER_LEN))); 215 } 216 217 ((Seekable)seekableIn).seek(adjStart); 218 SplitCompressionInputStream in = 219 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 220 221 222 // The following if clause handles the following case: 223 // Assume the following scenario in BZip2 compressed stream where 224 // . represent compressed data. 225 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 226 // ........................[47 bits][1 bit].....[48 bit Block]... 227 // ................................^[Assume a Byte alignment here] 228 // ........................................^^[current position of stream] 229 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 230 // ........................................^^[We align at wrong position!] 231 // ...........................................................^^[While this pos is correct] 232 233 if (in.getPos() < start) { 234 ((Seekable)seekableIn).seek(start); 235 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 236 } 237 238 return in; 239 } 240 241 /** 242 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 243 * 244 * @return the type of decompressor needed by this codec. 245 */ 246 @Override 247 public Class<? extends Decompressor> getDecompressorType() { 248 return Bzip2Factory.getBzip2DecompressorType(conf); 249 } 250 251 /** 252 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 253 * 254 * @return a new decompressor for use by this codec 255 */ 256 @Override 257 public Decompressor createDecompressor() { 258 return Bzip2Factory.getBzip2Decompressor(conf); 259 } 260 261 /** 262 * .bz2 is recognized as the default extension for compressed BZip2 files 263 * 264 * @return A String telling the default bzip2 file extension 265 */ 266 @Override 267 public String getDefaultExtension() { 268 return ".bz2"; 269 } 270 271 private static class BZip2CompressionOutputStream extends 272 CompressionOutputStream { 273 274 // class data starts here// 275 private CBZip2OutputStream output; 276 private boolean needsReset; 277 // class data ends here// 278 279 public BZip2CompressionOutputStream(OutputStream out) 280 throws IOException { 281 super(out); 282 needsReset = true; 283 } 284 285 private void writeStreamHeader() throws IOException { 286 if (super.out != null) { 287 // The compressed bzip2 stream should start with the 288 // identifying characters BZ. Caller of CBZip2OutputStream 289 // i.e. this class must write these characters. 290 out.write(HEADER.getBytes(Charsets.UTF_8)); 291 } 292 } 293 294 public void finish() throws IOException { 295 if (needsReset) { 296 // In the case that nothing is written to this stream, we still need to 297 // write out the header before closing, otherwise the stream won't be 298 // recognized by BZip2CompressionInputStream. 299 internalReset(); 300 } 301 this.output.finish(); 302 needsReset = true; 303 } 304 305 private void internalReset() throws IOException { 306 if (needsReset) { 307 needsReset = false; 308 writeStreamHeader(); 309 this.output = new CBZip2OutputStream(out); 310 } 311 } 312 313 public void resetState() throws IOException { 314 // Cannot write to out at this point because out might not be ready 315 // yet, as in SequenceFile.Writer implementation. 316 needsReset = true; 317 } 318 319 public void write(int b) throws IOException { 320 if (needsReset) { 321 internalReset(); 322 } 323 this.output.write(b); 324 } 325 326 public void write(byte[] b, int off, int len) throws IOException { 327 if (needsReset) { 328 internalReset(); 329 } 330 this.output.write(b, off, len); 331 } 332 333 public void close() throws IOException { 334 try { 335 super.close(); 336 } finally { 337 output.close(); 338 } 339 } 340 341 }// end of class BZip2CompressionOutputStream 342 343 /** 344 * This class is capable to de-compress BZip2 data in two modes; 345 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 346 * do decompression starting any arbitrary position in the stream. 347 * 348 * So this facility can easily be used to parallelize decompression 349 * of a large BZip2 file for performance reasons. (It is exactly 350 * done so for Hadoop framework. See LineRecordReader for an 351 * example). So one can break the file (of course logically) into 352 * chunks for parallel processing. These "splits" should be like 353 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 354 * So this code is designed and tested for FileInputFormat's way 355 * of splitting only. 356 */ 357 358 private static class BZip2CompressionInputStream extends 359 SplitCompressionInputStream { 360 361 // class data starts here// 362 private CBZip2InputStream input; 363 boolean needsReset; 364 private BufferedInputStream bufferedIn; 365 private boolean isHeaderStripped = false; 366 private boolean isSubHeaderStripped = false; 367 private READ_MODE readMode = READ_MODE.CONTINUOUS; 368 private long startingPos = 0L; 369 370 // Following state machine handles different states of compressed stream 371 // position 372 // HOLD : Don't advertise compressed stream position 373 // ADVERTISE : Read 1 more character and advertise stream position 374 // See more comments about it before updatePos method. 375 private enum POS_ADVERTISEMENT_STATE_MACHINE { 376 HOLD, ADVERTISE 377 }; 378 379 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 380 long compressedStreamPosition = 0; 381 382 // class data ends here// 383 384 public BZip2CompressionInputStream(InputStream in) throws IOException { 385 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 386 } 387 388 public BZip2CompressionInputStream(InputStream in, long start, long end, 389 READ_MODE readMode) throws IOException { 390 super(in, start, end); 391 needsReset = false; 392 bufferedIn = new BufferedInputStream(super.in); 393 this.startingPos = super.getPos(); 394 this.readMode = readMode; 395 if (this.startingPos == 0) { 396 // We only strip header if it is start of file 397 bufferedIn = readStreamHeader(); 398 } 399 input = new CBZip2InputStream(bufferedIn, readMode); 400 if (this.isHeaderStripped) { 401 input.updateReportedByteCount(HEADER_LEN); 402 } 403 404 if (this.isSubHeaderStripped) { 405 input.updateReportedByteCount(SUB_HEADER_LEN); 406 } 407 408 this.updatePos(false); 409 } 410 411 private BufferedInputStream readStreamHeader() throws IOException { 412 // We are flexible enough to allow the compressed stream not to 413 // start with the header of BZ. So it works fine either we have 414 // the header or not. 415 if (super.in != null) { 416 bufferedIn.mark(HEADER_LEN); 417 byte[] headerBytes = new byte[HEADER_LEN]; 418 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 419 if (actualRead != -1) { 420 String header = new String(headerBytes, Charsets.UTF_8); 421 if (header.compareTo(HEADER) != 0) { 422 bufferedIn.reset(); 423 } else { 424 this.isHeaderStripped = true; 425 // In case of BYBLOCK mode, we also want to strip off 426 // remaining two character of the header. 427 if (this.readMode == READ_MODE.BYBLOCK) { 428 actualRead = bufferedIn.read(headerBytes, 0, 429 SUB_HEADER_LEN); 430 if (actualRead != -1) { 431 this.isSubHeaderStripped = true; 432 } 433 } 434 } 435 } 436 } 437 438 if (bufferedIn == null) { 439 throw new IOException("Failed to read bzip2 stream."); 440 } 441 442 return bufferedIn; 443 444 }// end of method 445 446 public void close() throws IOException { 447 if (!needsReset) { 448 try { 449 input.close(); 450 needsReset = true; 451 } finally { 452 super.close(); 453 } 454 } 455 } 456 457 /** 458 * This method updates compressed stream position exactly when the 459 * client of this code has read off at least one byte passed any BZip2 460 * end of block marker. 461 * 462 * This mechanism is very helpful to deal with data level record 463 * boundaries. Please see constructor and next methods of 464 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 465 * feature. We elaborate it with an example in the following: 466 * 467 * Assume two different scenarios of the BZip2 compressed stream, where 468 * [m] represent end of block, \n is line delimiter and . represent compressed 469 * data. 470 * 471 * ............[m]......\n....... 472 * 473 * ..........\n[m]......\n....... 474 * 475 * Assume that end is right after [m]. In the first case the reading 476 * will stop at \n and there is no need to read one more line. (To see the 477 * reason of reading one more line in the next() method is explained in LineRecordReader.) 478 * While in the second example LineRecordReader needs to read one more line 479 * (till the second \n). Now since BZip2Codecs only update position 480 * at least one byte passed a maker, so it is straight forward to differentiate 481 * between the two cases mentioned. 482 * 483 */ 484 485 public int read(byte[] b, int off, int len) throws IOException { 486 if (needsReset) { 487 internalReset(); 488 } 489 490 int result = 0; 491 result = this.input.read(b, off, len); 492 if (result == BZip2Constants.END_OF_BLOCK) { 493 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 494 } 495 496 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 497 result = this.input.read(b, off, off + 1); 498 // This is the precise time to update compressed stream position 499 // to the client of this code. 500 this.updatePos(true); 501 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 502 } 503 504 return result; 505 506 } 507 508 public int read() throws IOException { 509 byte b[] = new byte[1]; 510 int result = this.read(b, 0, 1); 511 return (result < 0) ? result : (b[0] & 0xff); 512 } 513 514 private void internalReset() throws IOException { 515 if (needsReset) { 516 needsReset = false; 517 BufferedInputStream bufferedIn = readStreamHeader(); 518 input = new CBZip2InputStream(bufferedIn, this.readMode); 519 } 520 } 521 522 public void resetState() throws IOException { 523 // Cannot read from bufferedIn at this point because bufferedIn 524 // might not be ready 525 // yet, as in SequenceFile.Reader implementation. 526 needsReset = true; 527 } 528 529 public long getPos() { 530 return this.compressedStreamPosition; 531 } 532 533 /* 534 * As the comments before read method tell that 535 * compressed stream is advertised when at least 536 * one byte passed EOB have been read off. But 537 * there is an exception to this rule. When we 538 * construct the stream we advertise the position 539 * exactly at EOB. In the following method 540 * shouldAddOn boolean captures this exception. 541 * 542 */ 543 private void updatePos(boolean shouldAddOn) { 544 int addOn = shouldAddOn ? 1 : 0; 545 this.compressedStreamPosition = this.startingPos 546 + this.input.getProcessedByteCount() + addOn; 547 } 548 549 }// end of BZip2CompressionInputStream 550 551}