001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.io.compress;
020    
021    import java.io.IOException;
022    import java.io.InputStream;
023    
024    import org.apache.hadoop.classification.InterfaceAudience;
025    import org.apache.hadoop.classification.InterfaceStability;
026    import org.apache.hadoop.fs.PositionedReadable;
027    import org.apache.hadoop.fs.Seekable;
028    /**
029     * A compression input stream.
030     *
031     * <p>Implementations are assumed to be buffered.  This permits clients to
032     * reposition the underlying input stream then call {@link #resetState()},
033     * without having to also synchronize client buffers.
034     */
035    @InterfaceAudience.Public
036    @InterfaceStability.Evolving
037    public abstract class CompressionInputStream extends InputStream implements Seekable {
038      /**
039       * The input stream to be compressed. 
040       */
041      protected final InputStream in;
042      protected long maxAvailableData = 0L;
043    
044      /**
045       * Create a compression input stream that reads
046       * the decompressed bytes from the given stream.
047       * 
048       * @param in The input stream to be compressed.
049       * @throws IOException
050       */
051      protected CompressionInputStream(InputStream in) throws IOException {
052        if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
053            this.maxAvailableData = in.available();
054        }
055        this.in = in;
056      }
057    
058      @Override
059      public void close() throws IOException {
060        in.close();
061      }
062      
063      /**
064       * Read bytes from the stream.
065       * Made abstract to prevent leakage to underlying stream.
066       */
067      @Override
068      public abstract int read(byte[] b, int off, int len) throws IOException;
069    
070      /**
071       * Reset the decompressor to its initial state and discard any buffered data,
072       * as the underlying stream may have been repositioned.
073       */
074      public abstract void resetState() throws IOException;
075      
076      /**
077       * This method returns the current position in the stream.
078       *
079       * @return Current position in stream as a long
080       */
081      @Override
082      public long getPos() throws IOException {
083        if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)){
084          //This way of getting the current position will not work for file
085          //size which can be fit in an int and hence can not be returned by
086          //available method.
087          return (this.maxAvailableData - this.in.available());
088        }
089        else{
090          return ((Seekable)this.in).getPos();
091        }
092    
093      }
094    
095      /**
096       * This method is current not supported.
097       *
098       * @throws UnsupportedOperationException
099       */
100    
101      @Override
102      public void seek(long pos) throws UnsupportedOperationException {
103        throw new UnsupportedOperationException();
104      }
105    
106      /**
107       * This method is current not supported.
108       *
109       * @throws UnsupportedOperationException
110       */
111      @Override
112      public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
113        throw new UnsupportedOperationException();
114      }
115    }