001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.BufferedInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    
026    import org.apache.hadoop.conf.Configurable;
027    import org.apache.hadoop.conf.Configuration;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.fs.Seekable;
032    import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
033    import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034    import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035    import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
036    
037    /**
038     * This class provides output and input streams for bzip2 compression
039     * and decompression.  It uses the native bzip2 library on the system
040     * if possible, else it uses a pure-Java implementation of the bzip2
041     * algorithm.  The configuration parameter
042     * io.compression.codec.bzip2.library can be used to control this
043     * behavior.
044     *
045     * In the pure-Java mode, the Compressor and Decompressor interfaces
046     * are not implemented.  Therefore, in that mode, those methods of
047     * CompressionCodec which have a Compressor or Decompressor type
048     * argument, throw UnsupportedOperationException.
049     *
050     * Currently, support for splittability is available only in the
051     * pure-Java mode; therefore, if a SplitCompressionInputStream is
052     * requested, the pure-Java implementation is used, regardless of the
053     * setting of the configuration parameter mentioned above.
054     */
055    @InterfaceAudience.Public
056    @InterfaceStability.Evolving
057    public class BZip2Codec implements Configurable, SplittableCompressionCodec {
058    
059      private static final String HEADER = "BZ";
060      private static final int HEADER_LEN = HEADER.length();
061      private static final String SUB_HEADER = "h9";
062      private static final int SUB_HEADER_LEN = SUB_HEADER.length();
063    
064      private Configuration conf;
065      
066      /**
067       * Set the configuration to be used by this object.
068       *
069       * @param conf the configuration object.
070       */
071      @Override
072      public void setConf(Configuration conf) {
073        this.conf = conf;
074      }
075      
076      /**
077       * Return the configuration used by this object.
078       *
079       * @return the configuration object used by this objec.
080       */
081      @Override
082      public Configuration getConf() {
083        return conf;
084      }
085      
086      /**
087      * Creates a new instance of BZip2Codec.
088      */
089      public BZip2Codec() { }
090    
091      /**
092       * Create a {@link CompressionOutputStream} that will write to the given
093       * {@link OutputStream}.
094       *
095       * @param out        the location for the final output stream
096       * @return a stream the user can write uncompressed data to, to have it 
097       *         compressed
098       * @throws IOException
099       */
100      @Override
101      public CompressionOutputStream createOutputStream(OutputStream out)
102          throws IOException {
103        return createOutputStream(out, createCompressor());
104      }
105    
106      /**
107       * Create a {@link CompressionOutputStream} that will write to the given
108       * {@link OutputStream} with the given {@link Compressor}.
109       *
110       * @param out        the location for the final output stream
111       * @param compressor compressor to use
112       * @return a stream the user can write uncompressed data to, to have it 
113       *         compressed
114       * @throws IOException
115       */
116      @Override
117      public CompressionOutputStream createOutputStream(OutputStream out,
118          Compressor compressor) throws IOException {
119        return Bzip2Factory.isNativeBzip2Loaded(conf) ?
120          new CompressorStream(out, compressor, 
121                               conf.getInt("io.file.buffer.size", 4*1024)) :
122          new BZip2CompressionOutputStream(out);
123      }
124    
125      /**
126       * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
127       *
128       * @return the type of compressor needed by this codec.
129       */
130      @Override
131      public Class<? extends Compressor> getCompressorType() {
132        return Bzip2Factory.getBzip2CompressorType(conf);
133      }
134    
135      /**
136       * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
137       *
138       * @return a new compressor for use by this codec
139       */
140      @Override
141      public Compressor createCompressor() {
142        return Bzip2Factory.getBzip2Compressor(conf);
143      }
144    
145      /**
146       * Create a {@link CompressionInputStream} that will read from the given
147       * input stream and return a stream for uncompressed data.
148       *
149       * @param in the stream to read compressed bytes from
150       * @return a stream to read uncompressed bytes from
151       * @throws IOException
152       */
153      @Override
154      public CompressionInputStream createInputStream(InputStream in)
155          throws IOException {
156        return createInputStream(in, createDecompressor());
157      }
158    
159      /**
160       * Create a {@link CompressionInputStream} that will read from the given
161       * {@link InputStream} with the given {@link Decompressor}, and return a 
162       * stream for uncompressed data.
163       *
164       * @param in           the stream to read compressed bytes from
165       * @param decompressor decompressor to use
166       * @return a stream to read uncompressed bytes from
167       * @throws IOException
168       */
169      @Override
170      public CompressionInputStream createInputStream(InputStream in,
171          Decompressor decompressor) throws IOException {
172        return Bzip2Factory.isNativeBzip2Loaded(conf) ? 
173          new DecompressorStream(in, decompressor,
174                                 conf.getInt("io.file.buffer.size", 4*1024)) :
175          new BZip2CompressionInputStream(in);
176      }
177    
178      /**
179       * Creates CompressionInputStream to be used to read off uncompressed data
180       * in one of the two reading modes. i.e. Continuous or Blocked reading modes
181       *
182       * @param seekableIn The InputStream
183       * @param start The start offset into the compressed stream
184       * @param end The end offset into the compressed stream
185       * @param readMode Controls whether progress is reported continuously or
186       *                 only at block boundaries.
187       *
188       * @return CompressionInputStream for BZip2 aligned at block boundaries
189       */
190      public SplitCompressionInputStream createInputStream(InputStream seekableIn,
191          Decompressor decompressor, long start, long end, READ_MODE readMode)
192          throws IOException {
193    
194        if (!(seekableIn instanceof Seekable)) {
195          throw new IOException("seekableIn must be an instance of " +
196              Seekable.class.getName());
197        }
198    
199        //find the position of first BZip2 start up marker
200        ((Seekable)seekableIn).seek(0);
201    
202        // BZip2 start of block markers are of 6 bytes.  But the very first block
203        // also has "BZh9", making it 10 bytes.  This is the common case.  But at
204        // time stream might start without a leading BZ.
205        final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
206          CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
207        long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
208    
209        ((Seekable)seekableIn).seek(adjStart);
210        SplitCompressionInputStream in =
211          new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
212    
213    
214        // The following if clause handles the following case:
215        // Assume the following scenario in BZip2 compressed stream where
216        // . represent compressed data.
217        // .....[48 bit Block].....[48 bit   Block].....[48 bit Block]...
218        // ........................[47 bits][1 bit].....[48 bit Block]...
219        // ................................^[Assume a Byte alignment here]
220        // ........................................^^[current position of stream]
221        // .....................^^[We go back 10 Bytes in stream and find a Block marker]
222        // ........................................^^[We align at wrong position!]
223        // ...........................................................^^[While this pos is correct]
224    
225        if (in.getPos() <= start) {
226          ((Seekable)seekableIn).seek(start);
227          in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
228        }
229    
230        return in;
231      }
232    
233      /**
234       * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
235       *
236       * @return the type of decompressor needed by this codec.
237       */
238      @Override
239      public Class<? extends Decompressor> getDecompressorType() {
240        return Bzip2Factory.getBzip2DecompressorType(conf);
241      }
242    
243      /**
244       * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
245       *
246       * @return a new decompressor for use by this codec
247       */
248      @Override
249      public Decompressor createDecompressor() {
250        return Bzip2Factory.getBzip2Decompressor(conf);
251      }
252    
253      /**
254      * .bz2 is recognized as the default extension for compressed BZip2 files
255      *
256      * @return A String telling the default bzip2 file extension
257      */
258      @Override
259      public String getDefaultExtension() {
260        return ".bz2";
261      }
262    
263      private static class BZip2CompressionOutputStream extends
264          CompressionOutputStream {
265    
266        // class data starts here//
267        private CBZip2OutputStream output;
268        private boolean needsReset; 
269        // class data ends here//
270    
271        public BZip2CompressionOutputStream(OutputStream out)
272            throws IOException {
273          super(out);
274          needsReset = true;
275        }
276    
277        private void writeStreamHeader() throws IOException {
278          if (super.out != null) {
279            // The compressed bzip2 stream should start with the
280            // identifying characters BZ. Caller of CBZip2OutputStream
281            // i.e. this class must write these characters.
282            out.write(HEADER.getBytes());
283          }
284        }
285    
286        public void finish() throws IOException {
287          if (needsReset) {
288            // In the case that nothing is written to this stream, we still need to
289            // write out the header before closing, otherwise the stream won't be
290            // recognized by BZip2CompressionInputStream.
291            internalReset();
292          }
293          this.output.finish();
294          needsReset = true;
295        }
296    
297        private void internalReset() throws IOException {
298          if (needsReset) {
299            needsReset = false;
300            writeStreamHeader();
301            this.output = new CBZip2OutputStream(out);
302          }
303        }    
304        
305        public void resetState() throws IOException {
306          // Cannot write to out at this point because out might not be ready
307          // yet, as in SequenceFile.Writer implementation.
308          needsReset = true;
309        }
310    
311        public void write(int b) throws IOException {
312          if (needsReset) {
313            internalReset();
314          }
315          this.output.write(b);
316        }
317    
318        public void write(byte[] b, int off, int len) throws IOException {
319          if (needsReset) {
320            internalReset();
321          }
322          this.output.write(b, off, len);
323        }
324    
325        public void close() throws IOException {
326          if (needsReset) {
327            // In the case that nothing is written to this stream, we still need to
328            // write out the header before closing, otherwise the stream won't be
329            // recognized by BZip2CompressionInputStream.
330            internalReset();
331          }
332          this.output.flush();
333          this.output.close();
334          needsReset = true;
335        }
336    
337      }// end of class BZip2CompressionOutputStream
338    
339      /**
340       * This class is capable to de-compress BZip2 data in two modes;
341       * CONTINOUS and BYBLOCK.  BYBLOCK mode makes it possible to
342       * do decompression starting any arbitrary position in the stream.
343       *
344       * So this facility can easily be used to parallelize decompression
345       * of a large BZip2 file for performance reasons.  (It is exactly
346       * done so for Hadoop framework.  See LineRecordReader for an
347       * example).  So one can break the file (of course logically) into
348       * chunks for parallel processing.  These "splits" should be like
349       * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
350       * So this code is designed and tested for FileInputFormat's way
351       * of splitting only.
352       */
353    
354      private static class BZip2CompressionInputStream extends
355          SplitCompressionInputStream {
356    
357        // class data starts here//
358        private CBZip2InputStream input;
359        boolean needsReset;
360        private BufferedInputStream bufferedIn;
361        private boolean isHeaderStripped = false;
362        private boolean isSubHeaderStripped = false;
363        private READ_MODE readMode = READ_MODE.CONTINUOUS;
364        private long startingPos = 0L;
365    
366        // Following state machine handles different states of compressed stream
367        // position
368        // HOLD : Don't advertise compressed stream position
369        // ADVERTISE : Read 1 more character and advertise stream position
370        // See more comments about it before updatePos method.
371        private enum POS_ADVERTISEMENT_STATE_MACHINE {
372          HOLD, ADVERTISE
373        };
374    
375        POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
376        long compressedStreamPosition = 0;
377    
378        // class data ends here//
379    
380        public BZip2CompressionInputStream(InputStream in) throws IOException {
381          this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
382        }
383    
384        public BZip2CompressionInputStream(InputStream in, long start, long end,
385            READ_MODE readMode) throws IOException {
386          super(in, start, end);
387          needsReset = false;
388          bufferedIn = new BufferedInputStream(super.in);
389          this.startingPos = super.getPos();
390          this.readMode = readMode;
391          if (this.startingPos == 0) {
392            // We only strip header if it is start of file
393            bufferedIn = readStreamHeader();
394          }
395          input = new CBZip2InputStream(bufferedIn, readMode);
396          if (this.isHeaderStripped) {
397            input.updateReportedByteCount(HEADER_LEN);
398          }
399    
400          if (this.isSubHeaderStripped) {
401            input.updateReportedByteCount(SUB_HEADER_LEN);
402          }
403    
404          this.updatePos(false);
405        }
406    
407        private BufferedInputStream readStreamHeader() throws IOException {
408          // We are flexible enough to allow the compressed stream not to
409          // start with the header of BZ. So it works fine either we have
410          // the header or not.
411          if (super.in != null) {
412            bufferedIn.mark(HEADER_LEN);
413            byte[] headerBytes = new byte[HEADER_LEN];
414            int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
415            if (actualRead != -1) {
416              String header = new String(headerBytes);
417              if (header.compareTo(HEADER) != 0) {
418                bufferedIn.reset();
419              } else {
420                this.isHeaderStripped = true;
421                // In case of BYBLOCK mode, we also want to strip off
422                // remaining two character of the header.
423                if (this.readMode == READ_MODE.BYBLOCK) {
424                  actualRead = bufferedIn.read(headerBytes, 0,
425                      SUB_HEADER_LEN);
426                  if (actualRead != -1) {
427                    this.isSubHeaderStripped = true;
428                  }
429                }
430              }
431            }
432          }
433    
434          if (bufferedIn == null) {
435            throw new IOException("Failed to read bzip2 stream.");
436          }
437    
438          return bufferedIn;
439    
440        }// end of method
441    
442        public void close() throws IOException {
443          if (!needsReset) {
444            input.close();
445            needsReset = true;
446          }
447        }
448    
449        /**
450        * This method updates compressed stream position exactly when the
451        * client of this code has read off at least one byte passed any BZip2
452        * end of block marker.
453        *
454        * This mechanism is very helpful to deal with data level record
455        * boundaries. Please see constructor and next methods of
456        * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
457        * feature.  We elaborate it with an example in the following:
458        *
459        * Assume two different scenarios of the BZip2 compressed stream, where
460        * [m] represent end of block, \n is line delimiter and . represent compressed
461        * data.
462        *
463        * ............[m]......\n.......
464        *
465        * ..........\n[m]......\n.......
466        *
467        * Assume that end is right after [m].  In the first case the reading
468        * will stop at \n and there is no need to read one more line.  (To see the
469        * reason of reading one more line in the next() method is explained in LineRecordReader.)
470        * While in the second example LineRecordReader needs to read one more line
471        * (till the second \n).  Now since BZip2Codecs only update position
472        * at least one byte passed a maker, so it is straight forward to differentiate
473        * between the two cases mentioned.
474        *
475        */
476    
477        public int read(byte[] b, int off, int len) throws IOException {
478          if (needsReset) {
479            internalReset();
480          }
481    
482          int result = 0;
483          result = this.input.read(b, off, len);
484          if (result == BZip2Constants.END_OF_BLOCK) {
485            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
486          }
487    
488          if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
489            result = this.input.read(b, off, off + 1);
490            // This is the precise time to update compressed stream position
491            // to the client of this code.
492            this.updatePos(true);
493            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
494          }
495    
496          return result;
497    
498        }
499    
500        public int read() throws IOException {
501          byte b[] = new byte[1];
502          int result = this.read(b, 0, 1);
503          return (result < 0) ? result : (b[0] & 0xff);
504        }
505    
506        private void internalReset() throws IOException {
507          if (needsReset) {
508            needsReset = false;
509            BufferedInputStream bufferedIn = readStreamHeader();
510            input = new CBZip2InputStream(bufferedIn, this.readMode);
511          }
512        }    
513        
514        public void resetState() throws IOException {
515          // Cannot read from bufferedIn at this point because bufferedIn
516          // might not be ready
517          // yet, as in SequenceFile.Reader implementation.
518          needsReset = true;
519        }
520    
521        public long getPos() {
522          return this.compressedStreamPosition;
523          }
524    
525        /*
526         * As the comments before read method tell that
527         * compressed stream is advertised when at least
528         * one byte passed EOB have been read off.  But
529         * there is an exception to this rule.  When we
530         * construct the stream we advertise the position
531         * exactly at EOB.  In the following method
532         * shouldAddOn boolean captures this exception.
533         *
534         */
535        private void updatePos(boolean shouldAddOn) {
536          int addOn = shouldAddOn ? 1 : 0;
537          this.compressedStreamPosition = this.startingPos
538              + this.input.getProcessedByteCount() + addOn;
539        }
540    
541      }// end of BZip2CompressionInputStream
542    
543    }