001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io.compress; 020 021 import java.io.BufferedInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 026 import org.apache.hadoop.conf.Configurable; 027 import org.apache.hadoop.conf.Configuration; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.fs.Seekable; 032 import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 035 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 036 037 /** 038 * This class provides output and input streams for bzip2 compression 039 * and decompression. It uses the native bzip2 library on the system 040 * if possible, else it uses a pure-Java implementation of the bzip2 041 * algorithm. The configuration parameter 042 * io.compression.codec.bzip2.library can be used to control this 043 * behavior. 044 * 045 * In the pure-Java mode, the Compressor and Decompressor interfaces 046 * are not implemented. Therefore, in that mode, those methods of 047 * CompressionCodec which have a Compressor or Decompressor type 048 * argument, throw UnsupportedOperationException. 049 * 050 * Currently, support for splittability is available only in the 051 * pure-Java mode; therefore, if a SplitCompressionInputStream is 052 * requested, the pure-Java implementation is used, regardless of the 053 * setting of the configuration parameter mentioned above. 054 */ 055 @InterfaceAudience.Public 056 @InterfaceStability.Evolving 057 public class BZip2Codec implements Configurable, SplittableCompressionCodec { 058 059 private static final String HEADER = "BZ"; 060 private static final int HEADER_LEN = HEADER.length(); 061 private static final String SUB_HEADER = "h9"; 062 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 063 064 private Configuration conf; 065 066 /** 067 * Set the configuration to be used by this object. 068 * 069 * @param conf the configuration object. 070 */ 071 @Override 072 public void setConf(Configuration conf) { 073 this.conf = conf; 074 } 075 076 /** 077 * Return the configuration used by this object. 078 * 079 * @return the configuration object used by this objec. 080 */ 081 @Override 082 public Configuration getConf() { 083 return conf; 084 } 085 086 /** 087 * Creates a new instance of BZip2Codec. 088 */ 089 public BZip2Codec() { } 090 091 /** 092 * Create a {@link CompressionOutputStream} that will write to the given 093 * {@link OutputStream}. 094 * 095 * @param out the location for the final output stream 096 * @return a stream the user can write uncompressed data to, to have it 097 * compressed 098 * @throws IOException 099 */ 100 @Override 101 public CompressionOutputStream createOutputStream(OutputStream out) 102 throws IOException { 103 return CompressionCodec.Util. 104 createOutputStreamWithCodecPool(this, conf, out); 105 } 106 107 /** 108 * Create a {@link CompressionOutputStream} that will write to the given 109 * {@link OutputStream} with the given {@link Compressor}. 110 * 111 * @param out the location for the final output stream 112 * @param compressor compressor to use 113 * @return a stream the user can write uncompressed data to, to have it 114 * compressed 115 * @throws IOException 116 */ 117 @Override 118 public CompressionOutputStream createOutputStream(OutputStream out, 119 Compressor compressor) throws IOException { 120 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 121 new CompressorStream(out, compressor, 122 conf.getInt("io.file.buffer.size", 4*1024)) : 123 new BZip2CompressionOutputStream(out); 124 } 125 126 /** 127 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 128 * 129 * @return the type of compressor needed by this codec. 130 */ 131 @Override 132 public Class<? extends Compressor> getCompressorType() { 133 return Bzip2Factory.getBzip2CompressorType(conf); 134 } 135 136 /** 137 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 138 * 139 * @return a new compressor for use by this codec 140 */ 141 @Override 142 public Compressor createCompressor() { 143 return Bzip2Factory.getBzip2Compressor(conf); 144 } 145 146 /** 147 * Create a {@link CompressionInputStream} that will read from the given 148 * input stream and return a stream for uncompressed data. 149 * 150 * @param in the stream to read compressed bytes from 151 * @return a stream to read uncompressed bytes from 152 * @throws IOException 153 */ 154 @Override 155 public CompressionInputStream createInputStream(InputStream in) 156 throws IOException { 157 return CompressionCodec.Util. 158 createInputStreamWithCodecPool(this, conf, in); 159 } 160 161 /** 162 * Create a {@link CompressionInputStream} that will read from the given 163 * {@link InputStream} with the given {@link Decompressor}, and return a 164 * stream for uncompressed data. 165 * 166 * @param in the stream to read compressed bytes from 167 * @param decompressor decompressor to use 168 * @return a stream to read uncompressed bytes from 169 * @throws IOException 170 */ 171 @Override 172 public CompressionInputStream createInputStream(InputStream in, 173 Decompressor decompressor) throws IOException { 174 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 175 new DecompressorStream(in, decompressor, 176 conf.getInt("io.file.buffer.size", 4*1024)) : 177 new BZip2CompressionInputStream(in); 178 } 179 180 /** 181 * Creates CompressionInputStream to be used to read off uncompressed data 182 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 183 * 184 * @param seekableIn The InputStream 185 * @param start The start offset into the compressed stream 186 * @param end The end offset into the compressed stream 187 * @param readMode Controls whether progress is reported continuously or 188 * only at block boundaries. 189 * 190 * @return CompressionInputStream for BZip2 aligned at block boundaries 191 */ 192 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 193 Decompressor decompressor, long start, long end, READ_MODE readMode) 194 throws IOException { 195 196 if (!(seekableIn instanceof Seekable)) { 197 throw new IOException("seekableIn must be an instance of " + 198 Seekable.class.getName()); 199 } 200 201 //find the position of first BZip2 start up marker 202 ((Seekable)seekableIn).seek(0); 203 204 // BZip2 start of block markers are of 6 bytes. But the very first block 205 // also has "BZh9", making it 10 bytes. This is the common case. But at 206 // time stream might start without a leading BZ. 207 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 208 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 209 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); 210 211 ((Seekable)seekableIn).seek(adjStart); 212 SplitCompressionInputStream in = 213 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 214 215 216 // The following if clause handles the following case: 217 // Assume the following scenario in BZip2 compressed stream where 218 // . represent compressed data. 219 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 220 // ........................[47 bits][1 bit].....[48 bit Block]... 221 // ................................^[Assume a Byte alignment here] 222 // ........................................^^[current position of stream] 223 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 224 // ........................................^^[We align at wrong position!] 225 // ...........................................................^^[While this pos is correct] 226 227 if (in.getPos() <= start) { 228 ((Seekable)seekableIn).seek(start); 229 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 230 } 231 232 return in; 233 } 234 235 /** 236 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 237 * 238 * @return the type of decompressor needed by this codec. 239 */ 240 @Override 241 public Class<? extends Decompressor> getDecompressorType() { 242 return Bzip2Factory.getBzip2DecompressorType(conf); 243 } 244 245 /** 246 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 247 * 248 * @return a new decompressor for use by this codec 249 */ 250 @Override 251 public Decompressor createDecompressor() { 252 return Bzip2Factory.getBzip2Decompressor(conf); 253 } 254 255 /** 256 * .bz2 is recognized as the default extension for compressed BZip2 files 257 * 258 * @return A String telling the default bzip2 file extension 259 */ 260 @Override 261 public String getDefaultExtension() { 262 return ".bz2"; 263 } 264 265 private static class BZip2CompressionOutputStream extends 266 CompressionOutputStream { 267 268 // class data starts here// 269 private CBZip2OutputStream output; 270 private boolean needsReset; 271 // class data ends here// 272 273 public BZip2CompressionOutputStream(OutputStream out) 274 throws IOException { 275 super(out); 276 needsReset = true; 277 } 278 279 private void writeStreamHeader() throws IOException { 280 if (super.out != null) { 281 // The compressed bzip2 stream should start with the 282 // identifying characters BZ. Caller of CBZip2OutputStream 283 // i.e. this class must write these characters. 284 out.write(HEADER.getBytes()); 285 } 286 } 287 288 public void finish() throws IOException { 289 if (needsReset) { 290 // In the case that nothing is written to this stream, we still need to 291 // write out the header before closing, otherwise the stream won't be 292 // recognized by BZip2CompressionInputStream. 293 internalReset(); 294 } 295 this.output.finish(); 296 needsReset = true; 297 } 298 299 private void internalReset() throws IOException { 300 if (needsReset) { 301 needsReset = false; 302 writeStreamHeader(); 303 this.output = new CBZip2OutputStream(out); 304 } 305 } 306 307 public void resetState() throws IOException { 308 // Cannot write to out at this point because out might not be ready 309 // yet, as in SequenceFile.Writer implementation. 310 needsReset = true; 311 } 312 313 public void write(int b) throws IOException { 314 if (needsReset) { 315 internalReset(); 316 } 317 this.output.write(b); 318 } 319 320 public void write(byte[] b, int off, int len) throws IOException { 321 if (needsReset) { 322 internalReset(); 323 } 324 this.output.write(b, off, len); 325 } 326 327 public void close() throws IOException { 328 if (needsReset) { 329 // In the case that nothing is written to this stream, we still need to 330 // write out the header before closing, otherwise the stream won't be 331 // recognized by BZip2CompressionInputStream. 332 internalReset(); 333 } 334 this.output.flush(); 335 this.output.close(); 336 needsReset = true; 337 } 338 339 }// end of class BZip2CompressionOutputStream 340 341 /** 342 * This class is capable to de-compress BZip2 data in two modes; 343 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 344 * do decompression starting any arbitrary position in the stream. 345 * 346 * So this facility can easily be used to parallelize decompression 347 * of a large BZip2 file for performance reasons. (It is exactly 348 * done so for Hadoop framework. See LineRecordReader for an 349 * example). So one can break the file (of course logically) into 350 * chunks for parallel processing. These "splits" should be like 351 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 352 * So this code is designed and tested for FileInputFormat's way 353 * of splitting only. 354 */ 355 356 private static class BZip2CompressionInputStream extends 357 SplitCompressionInputStream { 358 359 // class data starts here// 360 private CBZip2InputStream input; 361 boolean needsReset; 362 private BufferedInputStream bufferedIn; 363 private boolean isHeaderStripped = false; 364 private boolean isSubHeaderStripped = false; 365 private READ_MODE readMode = READ_MODE.CONTINUOUS; 366 private long startingPos = 0L; 367 368 // Following state machine handles different states of compressed stream 369 // position 370 // HOLD : Don't advertise compressed stream position 371 // ADVERTISE : Read 1 more character and advertise stream position 372 // See more comments about it before updatePos method. 373 private enum POS_ADVERTISEMENT_STATE_MACHINE { 374 HOLD, ADVERTISE 375 }; 376 377 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 378 long compressedStreamPosition = 0; 379 380 // class data ends here// 381 382 public BZip2CompressionInputStream(InputStream in) throws IOException { 383 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 384 } 385 386 public BZip2CompressionInputStream(InputStream in, long start, long end, 387 READ_MODE readMode) throws IOException { 388 super(in, start, end); 389 needsReset = false; 390 bufferedIn = new BufferedInputStream(super.in); 391 this.startingPos = super.getPos(); 392 this.readMode = readMode; 393 if (this.startingPos == 0) { 394 // We only strip header if it is start of file 395 bufferedIn = readStreamHeader(); 396 } 397 input = new CBZip2InputStream(bufferedIn, readMode); 398 if (this.isHeaderStripped) { 399 input.updateReportedByteCount(HEADER_LEN); 400 } 401 402 if (this.isSubHeaderStripped) { 403 input.updateReportedByteCount(SUB_HEADER_LEN); 404 } 405 406 this.updatePos(false); 407 } 408 409 private BufferedInputStream readStreamHeader() throws IOException { 410 // We are flexible enough to allow the compressed stream not to 411 // start with the header of BZ. So it works fine either we have 412 // the header or not. 413 if (super.in != null) { 414 bufferedIn.mark(HEADER_LEN); 415 byte[] headerBytes = new byte[HEADER_LEN]; 416 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 417 if (actualRead != -1) { 418 String header = new String(headerBytes); 419 if (header.compareTo(HEADER) != 0) { 420 bufferedIn.reset(); 421 } else { 422 this.isHeaderStripped = true; 423 // In case of BYBLOCK mode, we also want to strip off 424 // remaining two character of the header. 425 if (this.readMode == READ_MODE.BYBLOCK) { 426 actualRead = bufferedIn.read(headerBytes, 0, 427 SUB_HEADER_LEN); 428 if (actualRead != -1) { 429 this.isSubHeaderStripped = true; 430 } 431 } 432 } 433 } 434 } 435 436 if (bufferedIn == null) { 437 throw new IOException("Failed to read bzip2 stream."); 438 } 439 440 return bufferedIn; 441 442 }// end of method 443 444 public void close() throws IOException { 445 if (!needsReset) { 446 input.close(); 447 needsReset = true; 448 } 449 } 450 451 /** 452 * This method updates compressed stream position exactly when the 453 * client of this code has read off at least one byte passed any BZip2 454 * end of block marker. 455 * 456 * This mechanism is very helpful to deal with data level record 457 * boundaries. Please see constructor and next methods of 458 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 459 * feature. We elaborate it with an example in the following: 460 * 461 * Assume two different scenarios of the BZip2 compressed stream, where 462 * [m] represent end of block, \n is line delimiter and . represent compressed 463 * data. 464 * 465 * ............[m]......\n....... 466 * 467 * ..........\n[m]......\n....... 468 * 469 * Assume that end is right after [m]. In the first case the reading 470 * will stop at \n and there is no need to read one more line. (To see the 471 * reason of reading one more line in the next() method is explained in LineRecordReader.) 472 * While in the second example LineRecordReader needs to read one more line 473 * (till the second \n). Now since BZip2Codecs only update position 474 * at least one byte passed a maker, so it is straight forward to differentiate 475 * between the two cases mentioned. 476 * 477 */ 478 479 public int read(byte[] b, int off, int len) throws IOException { 480 if (needsReset) { 481 internalReset(); 482 } 483 484 int result = 0; 485 result = this.input.read(b, off, len); 486 if (result == BZip2Constants.END_OF_BLOCK) { 487 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 488 } 489 490 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 491 result = this.input.read(b, off, off + 1); 492 // This is the precise time to update compressed stream position 493 // to the client of this code. 494 this.updatePos(true); 495 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 496 } 497 498 return result; 499 500 } 501 502 public int read() throws IOException { 503 byte b[] = new byte[1]; 504 int result = this.read(b, 0, 1); 505 return (result < 0) ? result : (b[0] & 0xff); 506 } 507 508 private void internalReset() throws IOException { 509 if (needsReset) { 510 needsReset = false; 511 BufferedInputStream bufferedIn = readStreamHeader(); 512 input = new CBZip2InputStream(bufferedIn, this.readMode); 513 } 514 } 515 516 public void resetState() throws IOException { 517 // Cannot read from bufferedIn at this point because bufferedIn 518 // might not be ready 519 // yet, as in SequenceFile.Reader implementation. 520 needsReset = true; 521 } 522 523 public long getPos() { 524 return this.compressedStreamPosition; 525 } 526 527 /* 528 * As the comments before read method tell that 529 * compressed stream is advertised when at least 530 * one byte passed EOB have been read off. But 531 * there is an exception to this rule. When we 532 * construct the stream we advertise the position 533 * exactly at EOB. In the following method 534 * shouldAddOn boolean captures this exception. 535 * 536 */ 537 private void updatePos(boolean shouldAddOn) { 538 int addOn = shouldAddOn ? 1 : 0; 539 this.compressedStreamPosition = this.startingPos 540 + this.input.getProcessedByteCount() + addOn; 541 } 542 543 }// end of BZip2CompressionInputStream 544 545 }