001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.io.compress; 020 021 import java.io.BufferedInputStream; 022 import java.io.IOException; 023 import java.io.InputStream; 024 import java.io.OutputStream; 025 026 import org.apache.hadoop.conf.Configurable; 027 import org.apache.hadoop.conf.Configuration; 028 029 import org.apache.hadoop.classification.InterfaceAudience; 030 import org.apache.hadoop.classification.InterfaceStability; 031 import org.apache.hadoop.fs.Seekable; 032 import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 035 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 036 037 /** 038 * This class provides output and input streams for bzip2 compression 039 * and decompression. It uses the native bzip2 library on the system 040 * if possible, else it uses a pure-Java implementation of the bzip2 041 * algorithm. The configuration parameter 042 * io.compression.codec.bzip2.library can be used to control this 043 * behavior. 044 * 045 * In the pure-Java mode, the Compressor and Decompressor interfaces 046 * are not implemented. Therefore, in that mode, those methods of 047 * CompressionCodec which have a Compressor or Decompressor type 048 * argument, throw UnsupportedOperationException. 049 * 050 * Currently, support for splittability is available only in the 051 * pure-Java mode; therefore, if a SplitCompressionInputStream is 052 * requested, the pure-Java implementation is used, regardless of the 053 * setting of the configuration parameter mentioned above. 054 */ 055 @InterfaceAudience.Public 056 @InterfaceStability.Evolving 057 public class BZip2Codec implements Configurable, SplittableCompressionCodec { 058 059 private static final String HEADER = "BZ"; 060 private static final int HEADER_LEN = HEADER.length(); 061 private static final String SUB_HEADER = "h9"; 062 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 063 064 private Configuration conf; 065 066 /** 067 * Set the configuration to be used by this object. 068 * 069 * @param conf the configuration object. 070 */ 071 @Override 072 public void setConf(Configuration conf) { 073 this.conf = conf; 074 } 075 076 /** 077 * Return the configuration used by this object. 078 * 079 * @return the configuration object used by this objec. 080 */ 081 @Override 082 public Configuration getConf() { 083 return conf; 084 } 085 086 /** 087 * Creates a new instance of BZip2Codec. 088 */ 089 public BZip2Codec() { } 090 091 /** 092 * Create a {@link CompressionOutputStream} that will write to the given 093 * {@link OutputStream}. 094 * 095 * @param out the location for the final output stream 096 * @return a stream the user can write uncompressed data to, to have it 097 * compressed 098 * @throws IOException 099 */ 100 @Override 101 public CompressionOutputStream createOutputStream(OutputStream out) 102 throws IOException { 103 return createOutputStream(out, createCompressor()); 104 } 105 106 /** 107 * Create a {@link CompressionOutputStream} that will write to the given 108 * {@link OutputStream} with the given {@link Compressor}. 109 * 110 * @param out the location for the final output stream 111 * @param compressor compressor to use 112 * @return a stream the user can write uncompressed data to, to have it 113 * compressed 114 * @throws IOException 115 */ 116 @Override 117 public CompressionOutputStream createOutputStream(OutputStream out, 118 Compressor compressor) throws IOException { 119 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 120 new CompressorStream(out, compressor, 121 conf.getInt("io.file.buffer.size", 4*1024)) : 122 new BZip2CompressionOutputStream(out); 123 } 124 125 /** 126 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 127 * 128 * @return the type of compressor needed by this codec. 129 */ 130 @Override 131 public Class<? extends Compressor> getCompressorType() { 132 return Bzip2Factory.getBzip2CompressorType(conf); 133 } 134 135 /** 136 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 137 * 138 * @return a new compressor for use by this codec 139 */ 140 @Override 141 public Compressor createCompressor() { 142 return Bzip2Factory.getBzip2Compressor(conf); 143 } 144 145 /** 146 * Create a {@link CompressionInputStream} that will read from the given 147 * input stream and return a stream for uncompressed data. 148 * 149 * @param in the stream to read compressed bytes from 150 * @return a stream to read uncompressed bytes from 151 * @throws IOException 152 */ 153 @Override 154 public CompressionInputStream createInputStream(InputStream in) 155 throws IOException { 156 return createInputStream(in, createDecompressor()); 157 } 158 159 /** 160 * Create a {@link CompressionInputStream} that will read from the given 161 * {@link InputStream} with the given {@link Decompressor}, and return a 162 * stream for uncompressed data. 163 * 164 * @param in the stream to read compressed bytes from 165 * @param decompressor decompressor to use 166 * @return a stream to read uncompressed bytes from 167 * @throws IOException 168 */ 169 @Override 170 public CompressionInputStream createInputStream(InputStream in, 171 Decompressor decompressor) throws IOException { 172 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 173 new DecompressorStream(in, decompressor, 174 conf.getInt("io.file.buffer.size", 4*1024)) : 175 new BZip2CompressionInputStream(in); 176 } 177 178 /** 179 * Creates CompressionInputStream to be used to read off uncompressed data 180 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 181 * 182 * @param seekableIn The InputStream 183 * @param start The start offset into the compressed stream 184 * @param end The end offset into the compressed stream 185 * @param readMode Controls whether progress is reported continuously or 186 * only at block boundaries. 187 * 188 * @return CompressionInputStream for BZip2 aligned at block boundaries 189 */ 190 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 191 Decompressor decompressor, long start, long end, READ_MODE readMode) 192 throws IOException { 193 194 if (!(seekableIn instanceof Seekable)) { 195 throw new IOException("seekableIn must be an instance of " + 196 Seekable.class.getName()); 197 } 198 199 //find the position of first BZip2 start up marker 200 ((Seekable)seekableIn).seek(0); 201 202 // BZip2 start of block markers are of 6 bytes. But the very first block 203 // also has "BZh9", making it 10 bytes. This is the common case. But at 204 // time stream might start without a leading BZ. 205 final long FIRST_BZIP2_BLOCK_MARKER_POSITION = 206 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn); 207 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION); 208 209 ((Seekable)seekableIn).seek(adjStart); 210 SplitCompressionInputStream in = 211 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode); 212 213 214 // The following if clause handles the following case: 215 // Assume the following scenario in BZip2 compressed stream where 216 // . represent compressed data. 217 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]... 218 // ........................[47 bits][1 bit].....[48 bit Block]... 219 // ................................^[Assume a Byte alignment here] 220 // ........................................^^[current position of stream] 221 // .....................^^[We go back 10 Bytes in stream and find a Block marker] 222 // ........................................^^[We align at wrong position!] 223 // ...........................................................^^[While this pos is correct] 224 225 if (in.getPos() <= start) { 226 ((Seekable)seekableIn).seek(start); 227 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode); 228 } 229 230 return in; 231 } 232 233 /** 234 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 235 * 236 * @return the type of decompressor needed by this codec. 237 */ 238 @Override 239 public Class<? extends Decompressor> getDecompressorType() { 240 return Bzip2Factory.getBzip2DecompressorType(conf); 241 } 242 243 /** 244 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 245 * 246 * @return a new decompressor for use by this codec 247 */ 248 @Override 249 public Decompressor createDecompressor() { 250 return Bzip2Factory.getBzip2Decompressor(conf); 251 } 252 253 /** 254 * .bz2 is recognized as the default extension for compressed BZip2 files 255 * 256 * @return A String telling the default bzip2 file extension 257 */ 258 @Override 259 public String getDefaultExtension() { 260 return ".bz2"; 261 } 262 263 private static class BZip2CompressionOutputStream extends 264 CompressionOutputStream { 265 266 // class data starts here// 267 private CBZip2OutputStream output; 268 private boolean needsReset; 269 // class data ends here// 270 271 public BZip2CompressionOutputStream(OutputStream out) 272 throws IOException { 273 super(out); 274 needsReset = true; 275 } 276 277 private void writeStreamHeader() throws IOException { 278 if (super.out != null) { 279 // The compressed bzip2 stream should start with the 280 // identifying characters BZ. Caller of CBZip2OutputStream 281 // i.e. this class must write these characters. 282 out.write(HEADER.getBytes()); 283 } 284 } 285 286 public void finish() throws IOException { 287 if (needsReset) { 288 // In the case that nothing is written to this stream, we still need to 289 // write out the header before closing, otherwise the stream won't be 290 // recognized by BZip2CompressionInputStream. 291 internalReset(); 292 } 293 this.output.finish(); 294 needsReset = true; 295 } 296 297 private void internalReset() throws IOException { 298 if (needsReset) { 299 needsReset = false; 300 writeStreamHeader(); 301 this.output = new CBZip2OutputStream(out); 302 } 303 } 304 305 public void resetState() throws IOException { 306 // Cannot write to out at this point because out might not be ready 307 // yet, as in SequenceFile.Writer implementation. 308 needsReset = true; 309 } 310 311 public void write(int b) throws IOException { 312 if (needsReset) { 313 internalReset(); 314 } 315 this.output.write(b); 316 } 317 318 public void write(byte[] b, int off, int len) throws IOException { 319 if (needsReset) { 320 internalReset(); 321 } 322 this.output.write(b, off, len); 323 } 324 325 public void close() throws IOException { 326 if (needsReset) { 327 // In the case that nothing is written to this stream, we still need to 328 // write out the header before closing, otherwise the stream won't be 329 // recognized by BZip2CompressionInputStream. 330 internalReset(); 331 } 332 this.output.flush(); 333 this.output.close(); 334 needsReset = true; 335 } 336 337 }// end of class BZip2CompressionOutputStream 338 339 /** 340 * This class is capable to de-compress BZip2 data in two modes; 341 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 342 * do decompression starting any arbitrary position in the stream. 343 * 344 * So this facility can easily be used to parallelize decompression 345 * of a large BZip2 file for performance reasons. (It is exactly 346 * done so for Hadoop framework. See LineRecordReader for an 347 * example). So one can break the file (of course logically) into 348 * chunks for parallel processing. These "splits" should be like 349 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 350 * So this code is designed and tested for FileInputFormat's way 351 * of splitting only. 352 */ 353 354 private static class BZip2CompressionInputStream extends 355 SplitCompressionInputStream { 356 357 // class data starts here// 358 private CBZip2InputStream input; 359 boolean needsReset; 360 private BufferedInputStream bufferedIn; 361 private boolean isHeaderStripped = false; 362 private boolean isSubHeaderStripped = false; 363 private READ_MODE readMode = READ_MODE.CONTINUOUS; 364 private long startingPos = 0L; 365 366 // Following state machine handles different states of compressed stream 367 // position 368 // HOLD : Don't advertise compressed stream position 369 // ADVERTISE : Read 1 more character and advertise stream position 370 // See more comments about it before updatePos method. 371 private enum POS_ADVERTISEMENT_STATE_MACHINE { 372 HOLD, ADVERTISE 373 }; 374 375 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 376 long compressedStreamPosition = 0; 377 378 // class data ends here// 379 380 public BZip2CompressionInputStream(InputStream in) throws IOException { 381 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 382 } 383 384 public BZip2CompressionInputStream(InputStream in, long start, long end, 385 READ_MODE readMode) throws IOException { 386 super(in, start, end); 387 needsReset = false; 388 bufferedIn = new BufferedInputStream(super.in); 389 this.startingPos = super.getPos(); 390 this.readMode = readMode; 391 if (this.startingPos == 0) { 392 // We only strip header if it is start of file 393 bufferedIn = readStreamHeader(); 394 } 395 input = new CBZip2InputStream(bufferedIn, readMode); 396 if (this.isHeaderStripped) { 397 input.updateReportedByteCount(HEADER_LEN); 398 } 399 400 if (this.isSubHeaderStripped) { 401 input.updateReportedByteCount(SUB_HEADER_LEN); 402 } 403 404 this.updatePos(false); 405 } 406 407 private BufferedInputStream readStreamHeader() throws IOException { 408 // We are flexible enough to allow the compressed stream not to 409 // start with the header of BZ. So it works fine either we have 410 // the header or not. 411 if (super.in != null) { 412 bufferedIn.mark(HEADER_LEN); 413 byte[] headerBytes = new byte[HEADER_LEN]; 414 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 415 if (actualRead != -1) { 416 String header = new String(headerBytes); 417 if (header.compareTo(HEADER) != 0) { 418 bufferedIn.reset(); 419 } else { 420 this.isHeaderStripped = true; 421 // In case of BYBLOCK mode, we also want to strip off 422 // remaining two character of the header. 423 if (this.readMode == READ_MODE.BYBLOCK) { 424 actualRead = bufferedIn.read(headerBytes, 0, 425 SUB_HEADER_LEN); 426 if (actualRead != -1) { 427 this.isSubHeaderStripped = true; 428 } 429 } 430 } 431 } 432 } 433 434 if (bufferedIn == null) { 435 throw new IOException("Failed to read bzip2 stream."); 436 } 437 438 return bufferedIn; 439 440 }// end of method 441 442 public void close() throws IOException { 443 if (!needsReset) { 444 input.close(); 445 needsReset = true; 446 } 447 } 448 449 /** 450 * This method updates compressed stream position exactly when the 451 * client of this code has read off at least one byte passed any BZip2 452 * end of block marker. 453 * 454 * This mechanism is very helpful to deal with data level record 455 * boundaries. Please see constructor and next methods of 456 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 457 * feature. We elaborate it with an example in the following: 458 * 459 * Assume two different scenarios of the BZip2 compressed stream, where 460 * [m] represent end of block, \n is line delimiter and . represent compressed 461 * data. 462 * 463 * ............[m]......\n....... 464 * 465 * ..........\n[m]......\n....... 466 * 467 * Assume that end is right after [m]. In the first case the reading 468 * will stop at \n and there is no need to read one more line. (To see the 469 * reason of reading one more line in the next() method is explained in LineRecordReader.) 470 * While in the second example LineRecordReader needs to read one more line 471 * (till the second \n). Now since BZip2Codecs only update position 472 * at least one byte passed a maker, so it is straight forward to differentiate 473 * between the two cases mentioned. 474 * 475 */ 476 477 public int read(byte[] b, int off, int len) throws IOException { 478 if (needsReset) { 479 internalReset(); 480 } 481 482 int result = 0; 483 result = this.input.read(b, off, len); 484 if (result == BZip2Constants.END_OF_BLOCK) { 485 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 486 } 487 488 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 489 result = this.input.read(b, off, off + 1); 490 // This is the precise time to update compressed stream position 491 // to the client of this code. 492 this.updatePos(true); 493 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 494 } 495 496 return result; 497 498 } 499 500 public int read() throws IOException { 501 byte b[] = new byte[1]; 502 int result = this.read(b, 0, 1); 503 return (result < 0) ? result : (b[0] & 0xff); 504 } 505 506 private void internalReset() throws IOException { 507 if (needsReset) { 508 needsReset = false; 509 BufferedInputStream bufferedIn = readStreamHeader(); 510 input = new CBZip2InputStream(bufferedIn, this.readMode); 511 } 512 } 513 514 public void resetState() throws IOException { 515 // Cannot read from bufferedIn at this point because bufferedIn 516 // might not be ready 517 // yet, as in SequenceFile.Reader implementation. 518 needsReset = true; 519 } 520 521 public long getPos() { 522 return this.compressedStreamPosition; 523 } 524 525 /* 526 * As the comments before read method tell that 527 * compressed stream is advertised when at least 528 * one byte passed EOB have been read off. But 529 * there is an exception to this rule. When we 530 * construct the stream we advertise the position 531 * exactly at EOB. In the following method 532 * shouldAddOn boolean captures this exception. 533 * 534 */ 535 private void updatePos(boolean shouldAddOn) { 536 int addOn = shouldAddOn ? 1 : 0; 537 this.compressedStreamPosition = this.startingPos 538 + this.input.getProcessedByteCount() + addOn; 539 } 540 541 }// end of BZip2CompressionInputStream 542 543 }