001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.IOException;
022    import java.io.InputStream;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.fs.PositionedReadable;
027    import org.apache.hadoop.fs.Seekable;
028    /**
029     * A compression input stream.
030     *
031     * <p>Implementations are assumed to be buffered.  This permits clients to
032     * reposition the underlying input stream then call {@link #resetState()},
033     * without having to also synchronize client buffers.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public abstract class CompressionInputStream extends InputStream implements Seekable {
038      /**
039       * The input stream to be compressed. 
040       */
041      protected final InputStream in;
042      protected long maxAvailableData = 0L;
043    
044      private Decompressor trackedDecompressor;
045    
046      /**
047       * Create a compression input stream that reads
048       * the decompressed bytes from the given stream.
049       * 
050       * @param in The input stream to be compressed.
051       * @throws IOException
052       */
053      protected CompressionInputStream(InputStream in) throws IOException {
054        if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
055            this.maxAvailableData = in.available();
056        }
057        this.in = in;
058      }
059    
060      @Override
061      public void close() throws IOException {
062        in.close();
063        if (trackedDecompressor != null) {
064          CodecPool.returnDecompressor(trackedDecompressor);
065          trackedDecompressor = null;
066        }
067      }
068      
069      /**
070       * Read bytes from the stream.
071       * Made abstract to prevent leakage to underlying stream.
072       */
073      @Override
074      public abstract int read(byte[] b, int off, int len) throws IOException;
075    
076      /**
077       * Reset the decompressor to its initial state and discard any buffered data,
078       * as the underlying stream may have been repositioned.
079       */
080      public abstract void resetState() throws IOException;
081      
082      /**
083       * This method returns the current position in the stream.
084       *
085       * @return Current position in stream as a long
086       */
087      @Override
088      public long getPos() throws IOException {
089        if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
090          //This way of getting the current position will not work for file
091          //size which can be fit in an int and hence can not be returned by
092          //available method.
093          return (this.maxAvailableData - this.in.available());
094        }
095        else{
096          return ((Seekable)this.in).getPos();
097        }
098    
099      }
100    
101      /**
102       * This method is current not supported.
103       *
104       * @throws UnsupportedOperationException
105       */
106    
107      @Override
108      public void seek(long pos) throws UnsupportedOperationException {
109        throw new UnsupportedOperationException();
110      }
111    
112      /**
113       * This method is current not supported.
114       *
115       * @throws UnsupportedOperationException
116       */
117      @Override
118      public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
119        throw new UnsupportedOperationException();
120      }
121    
122      void setTrackedDecompressor(Decompressor decompressor) {
123        trackedDecompressor = decompressor;
124      }
125    }