001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.commons.io.Charsets; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.fs.Seekable; 033import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 034import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 035import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 036import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 037 038/** 039 * This class provides output and input streams for bzip2 compression 040 * and decompression. It uses the native bzip2 library on the system 041 * if possible, else it uses a pure-Java implementation of the bzip2 042 * algorithm. The configuration parameter 043 * io.compression.codec.bzip2.library can be used to control this 044 * behavior. 045 * 046 * In the pure-Java mode, the Compressor and Decompressor interfaces 047 * are not implemented. Therefore, in that mode, those methods of 048 * CompressionCodec which have a Compressor or Decompressor type 049 * argument, throw UnsupportedOperationException. 050 * 051 * Currently, support for splittability is available only in the 052 * pure-Java mode; therefore, if a SplitCompressionInputStream is 053 * requested, the pure-Java implementation is used, regardless of the 054 * setting of the configuration parameter mentioned above. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Evolving 058public class BZip2Codec implements Configurable, SplittableCompressionCodec { 059 060 private static final String HEADER = "BZ"; 061 private static final int HEADER_LEN = HEADER.length(); 062 private static final String SUB_HEADER = "h9"; 063 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 064 065 private Configuration conf; 066 067 /** 068 * Set the configuration to be used by this object. 069 * 070 * @param conf the configuration object. 071 */ 072 @Override 073 public void setConf(Configuration conf) { 074 this.conf = conf; 075 } 076 077 /** 078 * Return the configuration used by this object. 079 * 080 * @return the configuration object used by this objec. 081 */ 082 @Override 083 public Configuration getConf() { 084 return conf; 085 } 086 087 /** 088 * Creates a new instance of BZip2Codec. 089 */ 090 public BZip2Codec() { } 091 092 /** 093 * Create a {@link CompressionOutputStream} that will write to the given 094 * {@link OutputStream}. 095 * 096 * @param out the location for the final output stream 097 * @return a stream the user can write uncompressed data to, to have it 098 * compressed 099 * @throws IOException 100 */ 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out) 103 throws IOException { 104 return CompressionCodec.Util. 105 createOutputStreamWithCodecPool(this, conf, out); 106 } 107 108 /** 109 * Create a {@link CompressionOutputStream} that will write to the given 110 * {@link OutputStream} with the given {@link Compressor}. 111 * 112 * @param out the location for the final output stream 113 * @param compressor compressor to use 114 * @return a stream the user can write uncompressed data to, to have it 115 * compressed 116 * @throws IOException 117 */ 118 @Override 119 public CompressionOutputStream createOutputStream(OutputStream out, 120 Compressor compressor) throws IOException { 121 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 122 new CompressorStream(out, compressor, 123 conf.getInt("io.file.buffer.size", 4*1024)) : 124 new BZip2CompressionOutputStream(out); 125 } 126 127 /** 128 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 129 * 130 * @return the type of compressor needed by this codec. 131 */ 132 @Override 133 public Class<? extends Compressor> getCompressorType() { 134 return Bzip2Factory.getBzip2CompressorType(conf); 135 } 136 137 /** 138 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 139 * 140 * @return a new compressor for use by this codec 141 */ 142 @Override 143 public Compressor createCompressor() { 144 return Bzip2Factory.getBzip2Compressor(conf); 145 } 146 147 /** 148 * Create a {@link CompressionInputStream} that will read from the given 149 * input stream and return a stream for uncompressed data. 150 * 151 * @param in the stream to read compressed bytes from 152 * @return a stream to read uncompressed bytes from 153 * @throws IOException 154 */ 155 @Override 156 public CompressionInputStream createInputStream(InputStream in) 157 throws IOException { 158 return CompressionCodec.Util. 159 createInputStreamWithCodecPool(this, conf, in); 160 } 161 162 /** 163 * Create a {@link CompressionInputStream} that will read from the given 164 * {@link InputStream} with the given {@link Decompressor}, and return a 165 * stream for uncompressed data. 166 * 167 * @param in the stream to read compressed bytes from 168 * @param decompressor decompressor to use 169 * @return a stream to read uncompressed bytes from 170 * @throws IOException 171 */ 172 @Override 173 public CompressionInputStream createInputStream(InputStream in, 174 Decompressor decompressor) throws IOException { 175 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 176 new DecompressorStream(in, decompressor, 177 conf.getInt("io.file.buffer.size", 4*1024)) : 178 new BZip2CompressionInputStream(in); 179 } 180 181 /** 182 * Creates CompressionInputStream to be used to read off uncompressed data 183 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 184 * 185 * @param seekableIn The InputStream 186 * @param start The start offset into the compressed stream 187 * @param end The end offset into the compressed stream 188 * @param readMode Controls whether progress is reported continuously or 189 * only at block boundaries. 190 * 191 * @return CompressionInputStream for BZip2 aligned at block boundaries 192 */ 193 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 194 Decompressor decompressor, long start, long end, READ_MODE readMode) 195 throws IOException { 196 197 if (!(seekableIn instanceof Seekable)) { 198 throw new IOException("seekableIn must be an instance of " + 199 Seekable.class.getName()); 200 } 201 202 ((Seekable)seekableIn).seek(start); 203 return new BZip2CompressionInputStream(seekableIn, start, end, readMode); 204 } 205 206 /** 207 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 208 * 209 * @return the type of decompressor needed by this codec. 210 */ 211 @Override 212 public Class<? extends Decompressor> getDecompressorType() { 213 return Bzip2Factory.getBzip2DecompressorType(conf); 214 } 215 216 /** 217 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 218 * 219 * @return a new decompressor for use by this codec 220 */ 221 @Override 222 public Decompressor createDecompressor() { 223 return Bzip2Factory.getBzip2Decompressor(conf); 224 } 225 226 /** 227 * .bz2 is recognized as the default extension for compressed BZip2 files 228 * 229 * @return A String telling the default bzip2 file extension 230 */ 231 @Override 232 public String getDefaultExtension() { 233 return ".bz2"; 234 } 235 236 private static class BZip2CompressionOutputStream extends 237 CompressionOutputStream { 238 239 // class data starts here// 240 private CBZip2OutputStream output; 241 private boolean needsReset; 242 // class data ends here// 243 244 public BZip2CompressionOutputStream(OutputStream out) 245 throws IOException { 246 super(out); 247 needsReset = true; 248 } 249 250 private void writeStreamHeader() throws IOException { 251 if (super.out != null) { 252 // The compressed bzip2 stream should start with the 253 // identifying characters BZ. Caller of CBZip2OutputStream 254 // i.e. this class must write these characters. 255 out.write(HEADER.getBytes(Charsets.UTF_8)); 256 } 257 } 258 259 public void finish() throws IOException { 260 if (needsReset) { 261 // In the case that nothing is written to this stream, we still need to 262 // write out the header before closing, otherwise the stream won't be 263 // recognized by BZip2CompressionInputStream. 264 internalReset(); 265 } 266 this.output.finish(); 267 needsReset = true; 268 } 269 270 private void internalReset() throws IOException { 271 if (needsReset) { 272 needsReset = false; 273 writeStreamHeader(); 274 this.output = new CBZip2OutputStream(out); 275 } 276 } 277 278 public void resetState() throws IOException { 279 // Cannot write to out at this point because out might not be ready 280 // yet, as in SequenceFile.Writer implementation. 281 needsReset = true; 282 } 283 284 public void write(int b) throws IOException { 285 if (needsReset) { 286 internalReset(); 287 } 288 this.output.write(b); 289 } 290 291 public void write(byte[] b, int off, int len) throws IOException { 292 if (needsReset) { 293 internalReset(); 294 } 295 this.output.write(b, off, len); 296 } 297 298 public void close() throws IOException { 299 try { 300 super.close(); 301 } finally { 302 output.close(); 303 } 304 } 305 306 }// end of class BZip2CompressionOutputStream 307 308 /** 309 * This class is capable to de-compress BZip2 data in two modes; 310 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 311 * do decompression starting any arbitrary position in the stream. 312 * 313 * So this facility can easily be used to parallelize decompression 314 * of a large BZip2 file for performance reasons. (It is exactly 315 * done so for Hadoop framework. See LineRecordReader for an 316 * example). So one can break the file (of course logically) into 317 * chunks for parallel processing. These "splits" should be like 318 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 319 * So this code is designed and tested for FileInputFormat's way 320 * of splitting only. 321 */ 322 323 private static class BZip2CompressionInputStream extends 324 SplitCompressionInputStream { 325 326 // class data starts here// 327 private CBZip2InputStream input; 328 boolean needsReset; 329 private BufferedInputStream bufferedIn; 330 private boolean isHeaderStripped = false; 331 private boolean isSubHeaderStripped = false; 332 private READ_MODE readMode = READ_MODE.CONTINUOUS; 333 private long startingPos = 0L; 334 335 // Following state machine handles different states of compressed stream 336 // position 337 // HOLD : Don't advertise compressed stream position 338 // ADVERTISE : Read 1 more character and advertise stream position 339 // See more comments about it before updatePos method. 340 private enum POS_ADVERTISEMENT_STATE_MACHINE { 341 HOLD, ADVERTISE 342 }; 343 344 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 345 long compressedStreamPosition = 0; 346 347 // class data ends here// 348 349 public BZip2CompressionInputStream(InputStream in) throws IOException { 350 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 351 } 352 353 public BZip2CompressionInputStream(InputStream in, long start, long end, 354 READ_MODE readMode) throws IOException { 355 super(in, start, end); 356 needsReset = false; 357 bufferedIn = new BufferedInputStream(super.in); 358 this.startingPos = super.getPos(); 359 this.readMode = readMode; 360 long numSkipped = 0; 361 if (this.startingPos == 0) { 362 // We only strip header if it is start of file 363 bufferedIn = readStreamHeader(); 364 } else if (this.readMode == READ_MODE.BYBLOCK && 365 this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) { 366 // When we're in BYBLOCK mode and the start position is >=0 367 // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after 368 // start of the first bz2 block to avoid duplicated records 369 numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos; 370 long skipBytes = numSkipped; 371 while (skipBytes > 0) { 372 long s = bufferedIn.skip(skipBytes); 373 if (s > 0) { 374 skipBytes -= s; 375 } else { 376 if (bufferedIn.read() == -1) { 377 break; // end of the split 378 } else { 379 skipBytes--; 380 } 381 } 382 } 383 } 384 input = new CBZip2InputStream(bufferedIn, readMode); 385 if (this.isHeaderStripped) { 386 input.updateReportedByteCount(HEADER_LEN); 387 } 388 389 if (this.isSubHeaderStripped) { 390 input.updateReportedByteCount(SUB_HEADER_LEN); 391 } 392 393 if (numSkipped > 0) { 394 input.updateReportedByteCount((int) numSkipped); 395 } 396 397 // To avoid dropped records, not advertising a new byte position 398 // when we are in BYBLOCK mode and the start position is 0 399 if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) { 400 this.updatePos(false); 401 } 402 } 403 404 private BufferedInputStream readStreamHeader() throws IOException { 405 // We are flexible enough to allow the compressed stream not to 406 // start with the header of BZ. So it works fine either we have 407 // the header or not. 408 if (super.in != null) { 409 bufferedIn.mark(HEADER_LEN); 410 byte[] headerBytes = new byte[HEADER_LEN]; 411 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 412 if (actualRead != -1) { 413 String header = new String(headerBytes, Charsets.UTF_8); 414 if (header.compareTo(HEADER) != 0) { 415 bufferedIn.reset(); 416 } else { 417 this.isHeaderStripped = true; 418 // In case of BYBLOCK mode, we also want to strip off 419 // remaining two character of the header. 420 if (this.readMode == READ_MODE.BYBLOCK) { 421 actualRead = bufferedIn.read(headerBytes, 0, 422 SUB_HEADER_LEN); 423 if (actualRead != -1) { 424 this.isSubHeaderStripped = true; 425 } 426 } 427 } 428 } 429 } 430 431 if (bufferedIn == null) { 432 throw new IOException("Failed to read bzip2 stream."); 433 } 434 435 return bufferedIn; 436 437 }// end of method 438 439 public void close() throws IOException { 440 if (!needsReset) { 441 try { 442 input.close(); 443 needsReset = true; 444 } finally { 445 super.close(); 446 } 447 } 448 } 449 450 /** 451 * This method updates compressed stream position exactly when the 452 * client of this code has read off at least one byte passed any BZip2 453 * end of block marker. 454 * 455 * This mechanism is very helpful to deal with data level record 456 * boundaries. Please see constructor and next methods of 457 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 458 * feature. We elaborate it with an example in the following: 459 * 460 * Assume two different scenarios of the BZip2 compressed stream, where 461 * [m] represent end of block, \n is line delimiter and . represent compressed 462 * data. 463 * 464 * ............[m]......\n....... 465 * 466 * ..........\n[m]......\n....... 467 * 468 * Assume that end is right after [m]. In the first case the reading 469 * will stop at \n and there is no need to read one more line. (To see the 470 * reason of reading one more line in the next() method is explained in LineRecordReader.) 471 * While in the second example LineRecordReader needs to read one more line 472 * (till the second \n). Now since BZip2Codecs only update position 473 * at least one byte passed a maker, so it is straight forward to differentiate 474 * between the two cases mentioned. 475 * 476 */ 477 478 public int read(byte[] b, int off, int len) throws IOException { 479 if (needsReset) { 480 internalReset(); 481 } 482 483 int result = 0; 484 result = this.input.read(b, off, len); 485 if (result == BZip2Constants.END_OF_BLOCK) { 486 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 487 } 488 489 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 490 result = this.input.read(b, off, off + 1); 491 // This is the precise time to update compressed stream position 492 // to the client of this code. 493 this.updatePos(true); 494 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 495 } 496 497 return result; 498 499 } 500 501 public int read() throws IOException { 502 byte b[] = new byte[1]; 503 int result = this.read(b, 0, 1); 504 return (result < 0) ? result : (b[0] & 0xff); 505 } 506 507 private void internalReset() throws IOException { 508 if (needsReset) { 509 needsReset = false; 510 BufferedInputStream bufferedIn = readStreamHeader(); 511 input = new CBZip2InputStream(bufferedIn, this.readMode); 512 } 513 } 514 515 public void resetState() throws IOException { 516 // Cannot read from bufferedIn at this point because bufferedIn 517 // might not be ready 518 // yet, as in SequenceFile.Reader implementation. 519 needsReset = true; 520 } 521 522 public long getPos() { 523 return this.compressedStreamPosition; 524 } 525 526 /* 527 * As the comments before read method tell that 528 * compressed stream is advertised when at least 529 * one byte passed EOB have been read off. But 530 * there is an exception to this rule. When we 531 * construct the stream we advertise the position 532 * exactly at EOB. In the following method 533 * shouldAddOn boolean captures this exception. 534 * 535 */ 536 private void updatePos(boolean shouldAddOn) { 537 int addOn = shouldAddOn ? 1 : 0; 538 this.compressedStreamPosition = this.startingPos 539 + this.input.getProcessedByteCount() + addOn; 540 } 541 542 }// end of BZip2CompressionInputStream 543 544}