001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.BufferedInputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    
026    import org.apache.hadoop.conf.Configurable;
027    import org.apache.hadoop.conf.Configuration;
028    
029    import org.apache.hadoop.classification.InterfaceAudience;
030    import org.apache.hadoop.classification.InterfaceStability;
031    import org.apache.hadoop.fs.Seekable;
032    import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
033    import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034    import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035    import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
036    
037    /**
038     * This class provides output and input streams for bzip2 compression
039     * and decompression.  It uses the native bzip2 library on the system
040     * if possible, else it uses a pure-Java implementation of the bzip2
041     * algorithm.  The configuration parameter
042     * io.compression.codec.bzip2.library can be used to control this
043     * behavior.
044     *
045     * In the pure-Java mode, the Compressor and Decompressor interfaces
046     * are not implemented.  Therefore, in that mode, those methods of
047     * CompressionCodec which have a Compressor or Decompressor type
048     * argument, throw UnsupportedOperationException.
049     *
050     * Currently, support for splittability is available only in the
051     * pure-Java mode; therefore, if a SplitCompressionInputStream is
052     * requested, the pure-Java implementation is used, regardless of the
053     * setting of the configuration parameter mentioned above.
054     */
055    @InterfaceAudience.Public
056    @InterfaceStability.Evolving
057    public class BZip2Codec implements Configurable, SplittableCompressionCodec {
058    
059      private static final String HEADER = "BZ";
060      private static final int HEADER_LEN = HEADER.length();
061      private static final String SUB_HEADER = "h9";
062      private static final int SUB_HEADER_LEN = SUB_HEADER.length();
063    
064      private Configuration conf;
065      
066      /**
067       * Set the configuration to be used by this object.
068       *
069       * @param conf the configuration object.
070       */
071      @Override
072      public void setConf(Configuration conf) {
073        this.conf = conf;
074      }
075      
076      /**
077       * Return the configuration used by this object.
078       *
079       * @return the configuration object used by this objec.
080       */
081      @Override
082      public Configuration getConf() {
083        return conf;
084      }
085      
086      /**
087      * Creates a new instance of BZip2Codec.
088      */
089      public BZip2Codec() { }
090    
091      /**
092       * Create a {@link CompressionOutputStream} that will write to the given
093       * {@link OutputStream}.
094       *
095       * @param out        the location for the final output stream
096       * @return a stream the user can write uncompressed data to, to have it 
097       *         compressed
098       * @throws IOException
099       */
100      @Override
101      public CompressionOutputStream createOutputStream(OutputStream out)
102          throws IOException {
103        return CompressionCodec.Util.
104            createOutputStreamWithCodecPool(this, conf, out);
105      }
106    
107      /**
108       * Create a {@link CompressionOutputStream} that will write to the given
109       * {@link OutputStream} with the given {@link Compressor}.
110       *
111       * @param out        the location for the final output stream
112       * @param compressor compressor to use
113       * @return a stream the user can write uncompressed data to, to have it 
114       *         compressed
115       * @throws IOException
116       */
117      @Override
118      public CompressionOutputStream createOutputStream(OutputStream out,
119          Compressor compressor) throws IOException {
120        return Bzip2Factory.isNativeBzip2Loaded(conf) ?
121          new CompressorStream(out, compressor, 
122                               conf.getInt("io.file.buffer.size", 4*1024)) :
123          new BZip2CompressionOutputStream(out);
124      }
125    
126      /**
127       * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
128       *
129       * @return the type of compressor needed by this codec.
130       */
131      @Override
132      public Class<? extends Compressor> getCompressorType() {
133        return Bzip2Factory.getBzip2CompressorType(conf);
134      }
135    
136      /**
137       * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
138       *
139       * @return a new compressor for use by this codec
140       */
141      @Override
142      public Compressor createCompressor() {
143        return Bzip2Factory.getBzip2Compressor(conf);
144      }
145    
146      /**
147       * Create a {@link CompressionInputStream} that will read from the given
148       * input stream and return a stream for uncompressed data.
149       *
150       * @param in the stream to read compressed bytes from
151       * @return a stream to read uncompressed bytes from
152       * @throws IOException
153       */
154      @Override
155      public CompressionInputStream createInputStream(InputStream in)
156          throws IOException {
157        return CompressionCodec.Util.
158            createInputStreamWithCodecPool(this, conf, in);
159      }
160    
161      /**
162       * Create a {@link CompressionInputStream} that will read from the given
163       * {@link InputStream} with the given {@link Decompressor}, and return a 
164       * stream for uncompressed data.
165       *
166       * @param in           the stream to read compressed bytes from
167       * @param decompressor decompressor to use
168       * @return a stream to read uncompressed bytes from
169       * @throws IOException
170       */
171      @Override
172      public CompressionInputStream createInputStream(InputStream in,
173          Decompressor decompressor) throws IOException {
174        return Bzip2Factory.isNativeBzip2Loaded(conf) ? 
175          new DecompressorStream(in, decompressor,
176                                 conf.getInt("io.file.buffer.size", 4*1024)) :
177          new BZip2CompressionInputStream(in);
178      }
179    
180      /**
181       * Creates CompressionInputStream to be used to read off uncompressed data
182       * in one of the two reading modes. i.e. Continuous or Blocked reading modes
183       *
184       * @param seekableIn The InputStream
185       * @param start The start offset into the compressed stream
186       * @param end The end offset into the compressed stream
187       * @param readMode Controls whether progress is reported continuously or
188       *                 only at block boundaries.
189       *
190       * @return CompressionInputStream for BZip2 aligned at block boundaries
191       */
192      public SplitCompressionInputStream createInputStream(InputStream seekableIn,
193          Decompressor decompressor, long start, long end, READ_MODE readMode)
194          throws IOException {
195    
196        if (!(seekableIn instanceof Seekable)) {
197          throw new IOException("seekableIn must be an instance of " +
198              Seekable.class.getName());
199        }
200    
201        //find the position of first BZip2 start up marker
202        ((Seekable)seekableIn).seek(0);
203    
204        // BZip2 start of block markers are of 6 bytes.  But the very first block
205        // also has "BZh9", making it 10 bytes.  This is the common case.  But at
206        // time stream might start without a leading BZ.
207        final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
208          CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
209        long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
210    
211        ((Seekable)seekableIn).seek(adjStart);
212        SplitCompressionInputStream in =
213          new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
214    
215    
216        // The following if clause handles the following case:
217        // Assume the following scenario in BZip2 compressed stream where
218        // . represent compressed data.
219        // .....[48 bit Block].....[48 bit   Block].....[48 bit Block]...
220        // ........................[47 bits][1 bit].....[48 bit Block]...
221        // ................................^[Assume a Byte alignment here]
222        // ........................................^^[current position of stream]
223        // .....................^^[We go back 10 Bytes in stream and find a Block marker]
224        // ........................................^^[We align at wrong position!]
225        // ...........................................................^^[While this pos is correct]
226    
227        if (in.getPos() <= start) {
228          ((Seekable)seekableIn).seek(start);
229          in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
230        }
231    
232        return in;
233      }
234    
235      /**
236       * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
237       *
238       * @return the type of decompressor needed by this codec.
239       */
240      @Override
241      public Class<? extends Decompressor> getDecompressorType() {
242        return Bzip2Factory.getBzip2DecompressorType(conf);
243      }
244    
245      /**
246       * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
247       *
248       * @return a new decompressor for use by this codec
249       */
250      @Override
251      public Decompressor createDecompressor() {
252        return Bzip2Factory.getBzip2Decompressor(conf);
253      }
254    
255      /**
256      * .bz2 is recognized as the default extension for compressed BZip2 files
257      *
258      * @return A String telling the default bzip2 file extension
259      */
260      @Override
261      public String getDefaultExtension() {
262        return ".bz2";
263      }
264    
265      private static class BZip2CompressionOutputStream extends
266          CompressionOutputStream {
267    
268        // class data starts here//
269        private CBZip2OutputStream output;
270        private boolean needsReset; 
271        // class data ends here//
272    
273        public BZip2CompressionOutputStream(OutputStream out)
274            throws IOException {
275          super(out);
276          needsReset = true;
277        }
278    
279        private void writeStreamHeader() throws IOException {
280          if (super.out != null) {
281            // The compressed bzip2 stream should start with the
282            // identifying characters BZ. Caller of CBZip2OutputStream
283            // i.e. this class must write these characters.
284            out.write(HEADER.getBytes());
285          }
286        }
287    
288        public void finish() throws IOException {
289          if (needsReset) {
290            // In the case that nothing is written to this stream, we still need to
291            // write out the header before closing, otherwise the stream won't be
292            // recognized by BZip2CompressionInputStream.
293            internalReset();
294          }
295          this.output.finish();
296          needsReset = true;
297        }
298    
299        private void internalReset() throws IOException {
300          if (needsReset) {
301            needsReset = false;
302            writeStreamHeader();
303            this.output = new CBZip2OutputStream(out);
304          }
305        }    
306        
307        public void resetState() throws IOException {
308          // Cannot write to out at this point because out might not be ready
309          // yet, as in SequenceFile.Writer implementation.
310          needsReset = true;
311        }
312    
313        public void write(int b) throws IOException {
314          if (needsReset) {
315            internalReset();
316          }
317          this.output.write(b);
318        }
319    
320        public void write(byte[] b, int off, int len) throws IOException {
321          if (needsReset) {
322            internalReset();
323          }
324          this.output.write(b, off, len);
325        }
326    
327        public void close() throws IOException {
328          if (needsReset) {
329            // In the case that nothing is written to this stream, we still need to
330            // write out the header before closing, otherwise the stream won't be
331            // recognized by BZip2CompressionInputStream.
332            internalReset();
333          }
334          this.output.flush();
335          this.output.close();
336          needsReset = true;
337        }
338    
339      }// end of class BZip2CompressionOutputStream
340    
341      /**
342       * This class is capable to de-compress BZip2 data in two modes;
343       * CONTINOUS and BYBLOCK.  BYBLOCK mode makes it possible to
344       * do decompression starting any arbitrary position in the stream.
345       *
346       * So this facility can easily be used to parallelize decompression
347       * of a large BZip2 file for performance reasons.  (It is exactly
348       * done so for Hadoop framework.  See LineRecordReader for an
349       * example).  So one can break the file (of course logically) into
350       * chunks for parallel processing.  These "splits" should be like
351       * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
352       * So this code is designed and tested for FileInputFormat's way
353       * of splitting only.
354       */
355    
356      private static class BZip2CompressionInputStream extends
357          SplitCompressionInputStream {
358    
359        // class data starts here//
360        private CBZip2InputStream input;
361        boolean needsReset;
362        private BufferedInputStream bufferedIn;
363        private boolean isHeaderStripped = false;
364        private boolean isSubHeaderStripped = false;
365        private READ_MODE readMode = READ_MODE.CONTINUOUS;
366        private long startingPos = 0L;
367    
368        // Following state machine handles different states of compressed stream
369        // position
370        // HOLD : Don't advertise compressed stream position
371        // ADVERTISE : Read 1 more character and advertise stream position
372        // See more comments about it before updatePos method.
373        private enum POS_ADVERTISEMENT_STATE_MACHINE {
374          HOLD, ADVERTISE
375        };
376    
377        POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
378        long compressedStreamPosition = 0;
379    
380        // class data ends here//
381    
382        public BZip2CompressionInputStream(InputStream in) throws IOException {
383          this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
384        }
385    
386        public BZip2CompressionInputStream(InputStream in, long start, long end,
387            READ_MODE readMode) throws IOException {
388          super(in, start, end);
389          needsReset = false;
390          bufferedIn = new BufferedInputStream(super.in);
391          this.startingPos = super.getPos();
392          this.readMode = readMode;
393          if (this.startingPos == 0) {
394            // We only strip header if it is start of file
395            bufferedIn = readStreamHeader();
396          }
397          input = new CBZip2InputStream(bufferedIn, readMode);
398          if (this.isHeaderStripped) {
399            input.updateReportedByteCount(HEADER_LEN);
400          }
401    
402          if (this.isSubHeaderStripped) {
403            input.updateReportedByteCount(SUB_HEADER_LEN);
404          }
405    
406          this.updatePos(false);
407        }
408    
409        private BufferedInputStream readStreamHeader() throws IOException {
410          // We are flexible enough to allow the compressed stream not to
411          // start with the header of BZ. So it works fine either we have
412          // the header or not.
413          if (super.in != null) {
414            bufferedIn.mark(HEADER_LEN);
415            byte[] headerBytes = new byte[HEADER_LEN];
416            int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
417            if (actualRead != -1) {
418              String header = new String(headerBytes);
419              if (header.compareTo(HEADER) != 0) {
420                bufferedIn.reset();
421              } else {
422                this.isHeaderStripped = true;
423                // In case of BYBLOCK mode, we also want to strip off
424                // remaining two character of the header.
425                if (this.readMode == READ_MODE.BYBLOCK) {
426                  actualRead = bufferedIn.read(headerBytes, 0,
427                      SUB_HEADER_LEN);
428                  if (actualRead != -1) {
429                    this.isSubHeaderStripped = true;
430                  }
431                }
432              }
433            }
434          }
435    
436          if (bufferedIn == null) {
437            throw new IOException("Failed to read bzip2 stream.");
438          }
439    
440          return bufferedIn;
441    
442        }// end of method
443    
444        public void close() throws IOException {
445          if (!needsReset) {
446            input.close();
447            needsReset = true;
448          }
449        }
450    
451        /**
452        * This method updates compressed stream position exactly when the
453        * client of this code has read off at least one byte passed any BZip2
454        * end of block marker.
455        *
456        * This mechanism is very helpful to deal with data level record
457        * boundaries. Please see constructor and next methods of
458        * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
459        * feature.  We elaborate it with an example in the following:
460        *
461        * Assume two different scenarios of the BZip2 compressed stream, where
462        * [m] represent end of block, \n is line delimiter and . represent compressed
463        * data.
464        *
465        * ............[m]......\n.......
466        *
467        * ..........\n[m]......\n.......
468        *
469        * Assume that end is right after [m].  In the first case the reading
470        * will stop at \n and there is no need to read one more line.  (To see the
471        * reason of reading one more line in the next() method is explained in LineRecordReader.)
472        * While in the second example LineRecordReader needs to read one more line
473        * (till the second \n).  Now since BZip2Codecs only update position
474        * at least one byte passed a maker, so it is straight forward to differentiate
475        * between the two cases mentioned.
476        *
477        */
478    
479        public int read(byte[] b, int off, int len) throws IOException {
480          if (needsReset) {
481            internalReset();
482          }
483    
484          int result = 0;
485          result = this.input.read(b, off, len);
486          if (result == BZip2Constants.END_OF_BLOCK) {
487            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
488          }
489    
490          if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
491            result = this.input.read(b, off, off + 1);
492            // This is the precise time to update compressed stream position
493            // to the client of this code.
494            this.updatePos(true);
495            this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
496          }
497    
498          return result;
499    
500        }
501    
502        public int read() throws IOException {
503          byte b[] = new byte[1];
504          int result = this.read(b, 0, 1);
505          return (result < 0) ? result : (b[0] & 0xff);
506        }
507    
508        private void internalReset() throws IOException {
509          if (needsReset) {
510            needsReset = false;
511            BufferedInputStream bufferedIn = readStreamHeader();
512            input = new CBZip2InputStream(bufferedIn, this.readMode);
513          }
514        }    
515        
516        public void resetState() throws IOException {
517          // Cannot read from bufferedIn at this point because bufferedIn
518          // might not be ready
519          // yet, as in SequenceFile.Reader implementation.
520          needsReset = true;
521        }
522    
523        public long getPos() {
524          return this.compressedStreamPosition;
525          }
526    
527        /*
528         * As the comments before read method tell that
529         * compressed stream is advertised when at least
530         * one byte passed EOB have been read off.  But
531         * there is an exception to this rule.  When we
532         * construct the stream we advertise the position
533         * exactly at EOB.  In the following method
534         * shouldAddOn boolean captures this exception.
535         *
536         */
537        private void updatePos(boolean shouldAddOn) {
538          int addOn = shouldAddOn ? 1 : 0;
539          this.compressedStreamPosition = this.startingPos
540              + this.input.getProcessedByteCount() + addOn;
541        }
542    
543      }// end of BZip2CompressionInputStream
544    
545    }