001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io.compress; 020 021import java.io.BufferedInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025 026import org.apache.commons.io.Charsets; 027import org.apache.hadoop.conf.Configurable; 028import org.apache.hadoop.conf.Configuration; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.fs.Seekable; 033import org.apache.hadoop.io.compress.bzip2.BZip2Constants; 034import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream; 035import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream; 036import org.apache.hadoop.io.compress.bzip2.Bzip2Factory; 037 038/** 039 * This class provides output and input streams for bzip2 compression 040 * and decompression. It uses the native bzip2 library on the system 041 * if possible, else it uses a pure-Java implementation of the bzip2 042 * algorithm. The configuration parameter 043 * io.compression.codec.bzip2.library can be used to control this 044 * behavior. 045 * 046 * In the pure-Java mode, the Compressor and Decompressor interfaces 047 * are not implemented. Therefore, in that mode, those methods of 048 * CompressionCodec which have a Compressor or Decompressor type 049 * argument, throw UnsupportedOperationException. 050 * 051 * Currently, support for splittability is available only in the 052 * pure-Java mode; therefore, if a SplitCompressionInputStream is 053 * requested, the pure-Java implementation is used, regardless of the 054 * setting of the configuration parameter mentioned above. 055 */ 056@InterfaceAudience.Public 057@InterfaceStability.Evolving 058public class BZip2Codec implements Configurable, SplittableCompressionCodec { 059 060 private static final String HEADER = "BZ"; 061 private static final int HEADER_LEN = HEADER.length(); 062 private static final String SUB_HEADER = "h9"; 063 private static final int SUB_HEADER_LEN = SUB_HEADER.length(); 064 065 private Configuration conf; 066 067 /** 068 * Set the configuration to be used by this object. 069 * 070 * @param conf the configuration object. 071 */ 072 @Override 073 public void setConf(Configuration conf) { 074 this.conf = conf; 075 } 076 077 /** 078 * Return the configuration used by this object. 079 * 080 * @return the configuration object used by this objec. 081 */ 082 @Override 083 public Configuration getConf() { 084 return conf; 085 } 086 087 /** 088 * Creates a new instance of BZip2Codec. 089 */ 090 public BZip2Codec() { } 091 092 /** 093 * Create a {@link CompressionOutputStream} that will write to the given 094 * {@link OutputStream}. 095 * 096 * @param out the location for the final output stream 097 * @return a stream the user can write uncompressed data to, to have it 098 * compressed 099 * @throws IOException 100 */ 101 @Override 102 public CompressionOutputStream createOutputStream(OutputStream out) 103 throws IOException { 104 return CompressionCodec.Util. 105 createOutputStreamWithCodecPool(this, conf, out); 106 } 107 108 /** 109 * Create a {@link CompressionOutputStream} that will write to the given 110 * {@link OutputStream} with the given {@link Compressor}. 111 * 112 * @param out the location for the final output stream 113 * @param compressor compressor to use 114 * @return a stream the user can write uncompressed data to, to have it 115 * compressed 116 * @throws IOException 117 */ 118 @Override 119 public CompressionOutputStream createOutputStream(OutputStream out, 120 Compressor compressor) throws IOException { 121 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 122 new CompressorStream(out, compressor, 123 conf.getInt("io.file.buffer.size", 4*1024)) : 124 new BZip2CompressionOutputStream(out); 125 } 126 127 /** 128 * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. 129 * 130 * @return the type of compressor needed by this codec. 131 */ 132 @Override 133 public Class<? extends Compressor> getCompressorType() { 134 return Bzip2Factory.getBzip2CompressorType(conf); 135 } 136 137 /** 138 * Create a new {@link Compressor} for use by this {@link CompressionCodec}. 139 * 140 * @return a new compressor for use by this codec 141 */ 142 @Override 143 public Compressor createCompressor() { 144 return Bzip2Factory.getBzip2Compressor(conf); 145 } 146 147 /** 148 * Create a {@link CompressionInputStream} that will read from the given 149 * input stream and return a stream for uncompressed data. 150 * 151 * @param in the stream to read compressed bytes from 152 * @return a stream to read uncompressed bytes from 153 * @throws IOException 154 */ 155 @Override 156 public CompressionInputStream createInputStream(InputStream in) 157 throws IOException { 158 return CompressionCodec.Util. 159 createInputStreamWithCodecPool(this, conf, in); 160 } 161 162 /** 163 * Create a {@link CompressionInputStream} that will read from the given 164 * {@link InputStream} with the given {@link Decompressor}, and return a 165 * stream for uncompressed data. 166 * 167 * @param in the stream to read compressed bytes from 168 * @param decompressor decompressor to use 169 * @return a stream to read uncompressed bytes from 170 * @throws IOException 171 */ 172 @Override 173 public CompressionInputStream createInputStream(InputStream in, 174 Decompressor decompressor) throws IOException { 175 return Bzip2Factory.isNativeBzip2Loaded(conf) ? 176 new DecompressorStream(in, decompressor, 177 conf.getInt("io.file.buffer.size", 4*1024)) : 178 new BZip2CompressionInputStream(in); 179 } 180 181 /** 182 * Creates CompressionInputStream to be used to read off uncompressed data 183 * in one of the two reading modes. i.e. Continuous or Blocked reading modes 184 * 185 * @param seekableIn The InputStream 186 * @param start The start offset into the compressed stream 187 * @param end The end offset into the compressed stream 188 * @param readMode Controls whether progress is reported continuously or 189 * only at block boundaries. 190 * 191 * @return CompressionInputStream for BZip2 aligned at block boundaries 192 */ 193 public SplitCompressionInputStream createInputStream(InputStream seekableIn, 194 Decompressor decompressor, long start, long end, READ_MODE readMode) 195 throws IOException { 196 197 if (!(seekableIn instanceof Seekable)) { 198 throw new IOException("seekableIn must be an instance of " + 199 Seekable.class.getName()); 200 } 201 202 ((Seekable)seekableIn).seek(start); 203 return new BZip2CompressionInputStream(seekableIn, start, end, readMode); 204 } 205 206 /** 207 * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. 208 * 209 * @return the type of decompressor needed by this codec. 210 */ 211 @Override 212 public Class<? extends Decompressor> getDecompressorType() { 213 return Bzip2Factory.getBzip2DecompressorType(conf); 214 } 215 216 /** 217 * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. 218 * 219 * @return a new decompressor for use by this codec 220 */ 221 @Override 222 public Decompressor createDecompressor() { 223 return Bzip2Factory.getBzip2Decompressor(conf); 224 } 225 226 /** 227 * .bz2 is recognized as the default extension for compressed BZip2 files 228 * 229 * @return A String telling the default bzip2 file extension 230 */ 231 @Override 232 public String getDefaultExtension() { 233 return ".bz2"; 234 } 235 236 private static class BZip2CompressionOutputStream extends 237 CompressionOutputStream { 238 239 // class data starts here// 240 private CBZip2OutputStream output; 241 private boolean needsReset; 242 // class data ends here// 243 244 public BZip2CompressionOutputStream(OutputStream out) 245 throws IOException { 246 super(out); 247 needsReset = true; 248 } 249 250 private void writeStreamHeader() throws IOException { 251 if (super.out != null) { 252 // The compressed bzip2 stream should start with the 253 // identifying characters BZ. Caller of CBZip2OutputStream 254 // i.e. this class must write these characters. 255 out.write(HEADER.getBytes(Charsets.UTF_8)); 256 } 257 } 258 259 public void finish() throws IOException { 260 if (needsReset) { 261 // In the case that nothing is written to this stream, we still need to 262 // write out the header before closing, otherwise the stream won't be 263 // recognized by BZip2CompressionInputStream. 264 internalReset(); 265 } 266 this.output.finish(); 267 needsReset = true; 268 } 269 270 private void internalReset() throws IOException { 271 if (needsReset) { 272 needsReset = false; 273 writeStreamHeader(); 274 this.output = new CBZip2OutputStream(out); 275 } 276 } 277 278 public void resetState() throws IOException { 279 // Cannot write to out at this point because out might not be ready 280 // yet, as in SequenceFile.Writer implementation. 281 needsReset = true; 282 } 283 284 public void write(int b) throws IOException { 285 if (needsReset) { 286 internalReset(); 287 } 288 this.output.write(b); 289 } 290 291 public void write(byte[] b, int off, int len) throws IOException { 292 if (needsReset) { 293 internalReset(); 294 } 295 this.output.write(b, off, len); 296 } 297 298 public void close() throws IOException { 299 try { 300 super.close(); 301 } finally { 302 output.close(); 303 } 304 } 305 306 }// end of class BZip2CompressionOutputStream 307 308 /** 309 * This class is capable to de-compress BZip2 data in two modes; 310 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to 311 * do decompression starting any arbitrary position in the stream. 312 * 313 * So this facility can easily be used to parallelize decompression 314 * of a large BZip2 file for performance reasons. (It is exactly 315 * done so for Hadoop framework. See LineRecordReader for an 316 * example). So one can break the file (of course logically) into 317 * chunks for parallel processing. These "splits" should be like 318 * default Hadoop splits (e.g as in FileInputFormat getSplit metod). 319 * So this code is designed and tested for FileInputFormat's way 320 * of splitting only. 321 */ 322 323 private static class BZip2CompressionInputStream extends 324 SplitCompressionInputStream { 325 326 // class data starts here// 327 private CBZip2InputStream input; 328 boolean needsReset; 329 private BufferedInputStream bufferedIn; 330 private boolean isHeaderStripped = false; 331 private boolean isSubHeaderStripped = false; 332 private READ_MODE readMode = READ_MODE.CONTINUOUS; 333 private long startingPos = 0L; 334 335 // Following state machine handles different states of compressed stream 336 // position 337 // HOLD : Don't advertise compressed stream position 338 // ADVERTISE : Read 1 more character and advertise stream position 339 // See more comments about it before updatePos method. 340 private enum POS_ADVERTISEMENT_STATE_MACHINE { 341 HOLD, ADVERTISE 342 }; 343 344 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 345 long compressedStreamPosition = 0; 346 347 // class data ends here// 348 349 public BZip2CompressionInputStream(InputStream in) throws IOException { 350 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS); 351 } 352 353 public BZip2CompressionInputStream(InputStream in, long start, long end, 354 READ_MODE readMode) throws IOException { 355 super(in, start, end); 356 needsReset = false; 357 bufferedIn = new BufferedInputStream(super.in); 358 this.startingPos = super.getPos(); 359 this.readMode = readMode; 360 if (this.startingPos == 0) { 361 // We only strip header if it is start of file 362 bufferedIn = readStreamHeader(); 363 } 364 input = new CBZip2InputStream(bufferedIn, readMode); 365 if (this.isHeaderStripped) { 366 input.updateReportedByteCount(HEADER_LEN); 367 } 368 369 if (this.isSubHeaderStripped) { 370 input.updateReportedByteCount(SUB_HEADER_LEN); 371 } 372 373 this.updatePos(false); 374 } 375 376 private BufferedInputStream readStreamHeader() throws IOException { 377 // We are flexible enough to allow the compressed stream not to 378 // start with the header of BZ. So it works fine either we have 379 // the header or not. 380 if (super.in != null) { 381 bufferedIn.mark(HEADER_LEN); 382 byte[] headerBytes = new byte[HEADER_LEN]; 383 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN); 384 if (actualRead != -1) { 385 String header = new String(headerBytes, Charsets.UTF_8); 386 if (header.compareTo(HEADER) != 0) { 387 bufferedIn.reset(); 388 } else { 389 this.isHeaderStripped = true; 390 // In case of BYBLOCK mode, we also want to strip off 391 // remaining two character of the header. 392 if (this.readMode == READ_MODE.BYBLOCK) { 393 actualRead = bufferedIn.read(headerBytes, 0, 394 SUB_HEADER_LEN); 395 if (actualRead != -1) { 396 this.isSubHeaderStripped = true; 397 } 398 } 399 } 400 } 401 } 402 403 if (bufferedIn == null) { 404 throw new IOException("Failed to read bzip2 stream."); 405 } 406 407 return bufferedIn; 408 409 }// end of method 410 411 public void close() throws IOException { 412 if (!needsReset) { 413 try { 414 input.close(); 415 needsReset = true; 416 } finally { 417 super.close(); 418 } 419 } 420 } 421 422 /** 423 * This method updates compressed stream position exactly when the 424 * client of this code has read off at least one byte passed any BZip2 425 * end of block marker. 426 * 427 * This mechanism is very helpful to deal with data level record 428 * boundaries. Please see constructor and next methods of 429 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this 430 * feature. We elaborate it with an example in the following: 431 * 432 * Assume two different scenarios of the BZip2 compressed stream, where 433 * [m] represent end of block, \n is line delimiter and . represent compressed 434 * data. 435 * 436 * ............[m]......\n....... 437 * 438 * ..........\n[m]......\n....... 439 * 440 * Assume that end is right after [m]. In the first case the reading 441 * will stop at \n and there is no need to read one more line. (To see the 442 * reason of reading one more line in the next() method is explained in LineRecordReader.) 443 * While in the second example LineRecordReader needs to read one more line 444 * (till the second \n). Now since BZip2Codecs only update position 445 * at least one byte passed a maker, so it is straight forward to differentiate 446 * between the two cases mentioned. 447 * 448 */ 449 450 public int read(byte[] b, int off, int len) throws IOException { 451 if (needsReset) { 452 internalReset(); 453 } 454 455 int result = 0; 456 result = this.input.read(b, off, len); 457 if (result == BZip2Constants.END_OF_BLOCK) { 458 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE; 459 } 460 461 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) { 462 result = this.input.read(b, off, off + 1); 463 // This is the precise time to update compressed stream position 464 // to the client of this code. 465 this.updatePos(true); 466 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD; 467 } 468 469 return result; 470 471 } 472 473 public int read() throws IOException { 474 byte b[] = new byte[1]; 475 int result = this.read(b, 0, 1); 476 return (result < 0) ? result : (b[0] & 0xff); 477 } 478 479 private void internalReset() throws IOException { 480 if (needsReset) { 481 needsReset = false; 482 BufferedInputStream bufferedIn = readStreamHeader(); 483 input = new CBZip2InputStream(bufferedIn, this.readMode); 484 } 485 } 486 487 public void resetState() throws IOException { 488 // Cannot read from bufferedIn at this point because bufferedIn 489 // might not be ready 490 // yet, as in SequenceFile.Reader implementation. 491 needsReset = true; 492 } 493 494 public long getPos() { 495 return this.compressedStreamPosition; 496 } 497 498 /* 499 * As the comments before read method tell that 500 * compressed stream is advertised when at least 501 * one byte passed EOB have been read off. But 502 * there is an exception to this rule. When we 503 * construct the stream we advertise the position 504 * exactly at EOB. In the following method 505 * shouldAddOn boolean captures this exception. 506 * 507 */ 508 private void updatePos(boolean shouldAddOn) { 509 int addOn = shouldAddOn ? 1 : 0; 510 this.compressedStreamPosition = this.startingPos 511 + this.input.getProcessedByteCount() + addOn; 512 } 513 514 }// end of BZip2CompressionInputStream 515 516}